Project
Loading...
Searching...
No Matches
CommonServices.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
25#include "Framework/Signpost.h"
32#include "InputRouteHelpers.h"
36#include "Framework/Tracing.h"
46#include "Framework/Signpost.h"
49
50#include "TextDriverClient.h"
51#include "WSDriverClient.h"
52#include "HTTPParser.h"
53#include "../src/DataProcessingStatus.h"
54#include "DecongestionService.h"
55#include "ArrowSupport.h"
57#include "Headers/STFHeader.h"
58#include "Headers/DataHeader.h"
59
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/MonitoringFactory.h>
63#include "Framework/Signpost.h"
64
65#include <fairmq/Device.h>
66#include <fairmq/shmem/Monitor.h>
67#include <fairmq/shmem/Common.h>
68#include <fairmq/ProgOptions.h>
69#include <uv.h>
70
71#include <cstdlib>
72#include <cstring>
73
74using o2::configuration::ConfigurationFactory;
75using o2::configuration::ConfigurationInterface;
76using o2::monitoring::Monitoring;
77using o2::monitoring::MonitoringFactory;
78using Metric = o2::monitoring::Metric;
79using Key = o2::monitoring::tags::Key;
80using Value = o2::monitoring::tags::Value;
81
82O2_DECLARE_DYNAMIC_LOG(data_processor_context);
83O2_DECLARE_DYNAMIC_LOG(stream_context);
86
87namespace o2::framework
88{
89
90#define MONITORING_QUEUE_SIZE 100
92{
93 return ServiceSpec{
94 .name = "monitoring",
95 .init = [](ServiceRegistryRef registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
96 void* service = nullptr;
97 bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
98 bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
99 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
100 o2::monitoring::Monitoring* monitoring;
101 if (useDPL) {
102 monitoring = new Monitoring();
103 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
104 (dynamic_cast<o2::monitoring::Backend*>(dplBackend.get()))->setVerbosity(o2::monitoring::Verbosity::Debug);
105 monitoring->addBackend(std::move(dplBackend));
106 } else {
107 auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
108 monitoring = MonitoringFactory::Get(backend).release();
109 }
110 service = monitoring;
111 monitoring->enableBuffering(MONITORING_QUEUE_SIZE);
112 assert(registry.get<DeviceSpec const>().name.empty() == false);
113 monitoring->addGlobalTag("pipeline_id", std::to_string(registry.get<DeviceSpec const>().inputTimesliceId));
114 monitoring->addGlobalTag("dataprocessor_name", registry.get<DeviceSpec const>().name);
115 monitoring->addGlobalTag("dpl_instance", options.GetPropertyAsString("shm-segment-id"));
116 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
117 },
118 .configure = noConfiguration(),
119 .start = [](ServiceRegistryRef services, void* service) {
120 auto* monitoring = (o2::monitoring::Monitoring*)service;
121
122 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
123 if (extRunNumber == "unspecified") {
124 return;
125 }
126 try {
127 monitoring->setRunNumber(std::stoul(extRunNumber));
128 } catch (...) {
129 } },
130 .exit = [](ServiceRegistryRef registry, void* service) {
131 auto* monitoring = reinterpret_cast<Monitoring*>(service);
132 monitoring->flushBuffer();
133 delete monitoring; },
134 .kind = ServiceKind::Serial};
135}
136
137// An asyncronous service that executes actions in at the end of the data processing
139{
140 return ServiceSpec{
141 .name = "async-queue",
142 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
143 .configure = noConfiguration(),
144 .stop = [](ServiceRegistryRef services, void* service) {
145 auto& queue = services.get<AsyncQueue>();
147 },
148 .kind = ServiceKind::Serial};
149}
150
151// Make it a service so that it can be used easily from the analysis
152// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
153// basis when we get to it.
155{
156 return ServiceSpec{
157 .name = "timing-info",
158 .uniqueId = simpleServiceId<TimingInfo>(),
159 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
160 .configure = noConfiguration(),
161 .kind = ServiceKind::Stream};
162}
163
165{
166 return ServiceSpec{
167 .name = "stream-context",
168 .uniqueId = simpleServiceId<StreamContext>(),
169 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
170 .configure = noConfiguration(),
171 .preProcessing = [](ProcessingContext& context, void* service) {
172 auto* stream = (StreamContext*)service;
173 auto& routes = context.services().get<DeviceSpec const>().outputs;
174 // Notice I need to do this here, because different invocation for
175 // the same stream might be referring to different data processors.
176 // We should probably have a context which is per stream of a specific
177 // data processor.
178 stream->routeDPLCreated.resize(routes.size());
179 stream->routeCreated.resize(routes.size());
180 // Reset the routeDPLCreated at every processing step
181 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false);
182 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
183 .postProcessing = [](ProcessingContext& processingContext, void* service) {
184 auto* stream = (StreamContext*)service;
185 auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
186 auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
187 auto& messageContext = processingContext.services().get<MessageContext>();
188 // Check if we never created any data for this timeslice
189 // if we did not, but we still have didDispatched set to true
190 // it means it was created out of band.
191 bool userDidCreate = false;
192 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
193 for (size_t ri = 0; ri < routes.size(); ++ri) {
194 if (stream->routeCreated[ri] == true && stream->routeDPLCreated[ri] == false) {
195 userDidCreate = true;
196 break;
197 }
198 }
199 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "userDidCreate == %d && didDispatch == %d",
200 userDidCreate,
201 messageContext.didDispatch());
202 if (userDidCreate == false && messageContext.didDispatch() == true) {
203 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
204 userDidCreate,
205 messageContext.didDispatch());
206 return;
207 }
208 if (userDidCreate == false && messageContext.didDispatch() == false) {
209 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
210 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "No data created.");
211 return;
212 }
213 for (size_t ri = 0; ri < routes.size(); ++ri) {
214 auto &route = routes[ri];
215 auto &matcher = route.matcher;
216 if (stream->routeDPLCreated[ri] == true) {
217 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created by DPL. ri = %" PRIu64 ", %{public}s",
218 (uint64_t)ri, DataSpecUtils::describe(matcher).c_str());
219 continue;
220 }
221 if (stream->routeCreated[ri] == true) {
222 continue;
223 } if ((timeslice % route.maxTimeslices) != route.timeslice) {
224 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Route ri = %" PRIu64 ", skipped because of pipelining.",
225 (uint64_t)ri);
226 continue;
227 }
228 if (matcher.lifetime == Lifetime::Timeframe) {
229 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks",
230 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64 " and might result in dropped timeframes",
231 DataSpecUtils::describe(matcher).c_str(), (uint64_t)timeslice);
232 LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
233 }
234 } },
235 .preEOS = [](EndOfStreamContext& context, void* service) {
236 // We need to reset the routeDPLCreated / routeCreated because the end of stream
237 // uses a different context which does not know about the routes.
238 // FIXME: This should be fixed in a different way, but for now it will
239 // allow TPC IDC to work.
240 auto* stream = (StreamContext*)service;
241 auto& routes = context.services().get<DeviceSpec const>().outputs;
242 // Notice I need to do this here, because different invocation for
243 // the same stream might be referring to different data processors.
244 // We should probably have a context which is per stream of a specific
245 // data processor.
246 stream->routeDPLCreated.resize(routes.size());
247 stream->routeCreated.resize(routes.size());
248 // Reset the routeCreated / routeDPLCreated at every processing step
249 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false);
250 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false); },
251 .kind = ServiceKind::Stream};
252}
253
255{
256 return ServiceSpec{
257 .name = "datataking-contex",
258 .uniqueId = simpleServiceId<DataTakingContext>(),
259 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
260 .configure = noConfiguration(),
261 .preProcessing = [](ProcessingContext& processingContext, void* service) {
262 auto& context = processingContext.services().get<DataTakingContext>();
263 for (auto const& ref : processingContext.inputs()) {
264 const o2::framework::DataProcessingHeader *dph = o2::header::get<DataProcessingHeader*>(ref.header);
265 const auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
266 if (!dph || !dh) {
267 continue;
268 }
269 context.runNumber = fmt::format("{}", dh->runNumber);
270 break;
271 } },
272 // Notice this will be executed only once, because the service is declared upfront.
273 .start = [](ServiceRegistryRef services, void* service) {
274 auto& context = services.get<DataTakingContext>();
275
277
278 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
279 if (extRunNumber != "unspecified" || context.runNumber == "0") {
280 context.runNumber = extRunNumber;
281 }
282 auto extLHCPeriod = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhc_period", "unspecified");
283 if (extLHCPeriod != "unspecified") {
284 context.lhcPeriod = extLHCPeriod;
285 } else {
286 static const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
287 time_t now = time(nullptr);
288 auto ltm = gmtime(&now);
289 context.lhcPeriod = months[ltm->tm_mon];
290 LOG(info) << "LHCPeriod is not available, using current month " << context.lhcPeriod;
291 }
292
293 auto extRunType = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("run_type", "unspecified");
294 if (extRunType != "unspecified") {
295 context.runType = extRunType;
296 }
297 auto extEnvId = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", "unspecified");
298 if (extEnvId != "unspecified") {
299 context.envId = extEnvId;
300 }
301 auto extDetectors = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("detectors", "unspecified");
302 if (extDetectors != "unspecified") {
303 context.detectors = extDetectors;
304 }
305 auto forcedRaw = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("force_run_as_raw", "false");
306 context.forcedRaw = forcedRaw == "true"; },
307 .kind = ServiceKind::Stream};
308}
309
311};
312
314{
315 return ServiceSpec{
316 .name = "configuration",
317 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
318 auto backend = options.GetPropertyAsString("configuration");
319 if (backend == "command-line") {
320 return ServiceHandle{0, nullptr};
321 }
322 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
323 ConfigurationFactory::getConfiguration(backend).release()};
324 },
325 .configure = noConfiguration(),
326 .driverStartup = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
327 if (dc.options.count("configuration") == 0) {
328 registry.registerService(ServiceHandle{0, nullptr});
329 return;
330 }
331 auto backend = dc.options["configuration"].as<std::string>();
332 registry.registerService(ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
333 ConfigurationFactory::getConfiguration(backend).release()}); },
334 .kind = ServiceKind::Global};
335}
336
338{
339 return ServiceSpec{
340 .name = "driverClient",
341 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
342 auto backend = options.GetPropertyAsString("driver-client-backend");
343 if (backend == "stdout://") {
344 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
345 new TextDriverClient(services, state)};
346 }
347 auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
348 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
349 new WSDriverClient(services, ip.c_str(), port)};
350 },
351 .configure = noConfiguration(),
352 .kind = ServiceKind::Global};
353}
354
356{
357 return ServiceSpec{
358 .name = "control",
359 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
360 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
361 new ControlService(services, state)};
362 },
363 .configure = noConfiguration(),
364 .kind = ServiceKind::Serial};
365}
366
368{
369 return ServiceSpec{
370 .name = "localrootfile",
371 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
372 .configure = noConfiguration(),
373 .kind = ServiceKind::Serial};
374}
375
377{
378 return ServiceSpec{
379 .name = "parallel",
380 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
381 auto& spec = services.get<DeviceSpec const>();
382 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
383 new ParallelContext(spec.rank, spec.nSlots)};
384 },
385 .configure = noConfiguration(),
386 .kind = ServiceKind::Serial};
387}
388
390{
391 return ServiceSpec{
392 .name = "timesliceindex",
393 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
394 auto& spec = services.get<DeviceSpec const>();
395 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
396 new TimesliceIndex(InputRouteHelpers::maxLanes(spec.inputs), state.inputChannelInfos)};
397 },
398 .configure = noConfiguration(),
399 .kind = ServiceKind::Serial};
400}
401
403{
404 return ServiceSpec{
405 .name = "callbacks",
406 .init = simpleServiceInit<CallbackService, CallbackService>(),
407 .configure = noConfiguration(),
408 .kind = ServiceKind::Serial};
409}
410
412{
413 return ServiceSpec{
414 .name = "datarelayer",
415 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
416 auto& spec = services.get<DeviceSpec const>();
417 int pipelineLength = DefaultsHelpers::pipelineLength(options);
418 return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
419 new DataRelayer(spec.completionPolicy,
420 spec.inputs,
421 services.get<TimesliceIndex>(),
422 services,
423 pipelineLength)};
424 },
425 .configure = noConfiguration(),
426 .kind = ServiceKind::Serial};
427}
428
430{
431 return ServiceSpec{
432 .name = "datasender",
433 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
434 return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
435 new DataSender(services)};
436 },
437 .configure = noConfiguration(),
438 .preProcessing = [](ProcessingContext&, void* service) {
439 auto& dataSender = *reinterpret_cast<DataSender*>(service);
440 dataSender.reset(); },
441 .postDispatching = [](ProcessingContext& ctx, void* service) {
442 auto& dataSender = *reinterpret_cast<DataSender*>(service);
443 // If the quit was requested, the post dispatching can still happen
444 // but with an empty set of data.
445 if (ctx.services().get<DeviceState>().quitRequested == false) {
446 dataSender.verifyMissingSporadic();
447 } },
449}
450
454
456{
457 return ServiceSpec{
458 .name = "tracing",
459 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
460 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<TracingInfrastructure>(),
461 .instance = new TracingInfrastructure(),
462 .kind = ServiceKind::Serial};
463 },
464 .configure = noConfiguration(),
465 .preProcessing = [](ProcessingContext&, void* service) {
466 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
467 t->processingCount += 1; },
468 .postProcessing = [](ProcessingContext&, void* service) {
469 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
470 t->processingCount += 1; },
471 .kind = ServiceKind::Serial};
472}
473
475};
476
477// CCDB Support service
479{
480 return ServiceSpec{
481 .name = "ccdb-support",
482 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
483 // iterate on all the outputs matchers
484 auto& spec = services.get<DeviceSpec const>();
485 for (auto& output : spec.outputs) {
486 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
487 LOGP(debug, "Optional inputs support enabled");
488 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = new CCDBSupport, .kind = ServiceKind::Serial};
489 }
490 }
491 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = nullptr, .kind = ServiceKind::Serial};
492 },
493 .configure = noConfiguration(),
494 .finaliseOutputs = [](ProcessingContext& pc, void* service) {
495 if (!service) {
496 return;
497 }
498 if (pc.outputs().countDeviceOutputs(true) == 0) {
499 LOGP(debug, "We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
500 return;
501 }
502 auto& timingInfo = pc.services().get<TimingInfo>();
503
504 // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
505 // we create a new message.
506 InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
507 auto& streamContext = pc.services().get<StreamContext>();
508 for (size_t oi = 0; oi < pc.services().get<DeviceSpec const>().outputs.size(); ++oi) {
509 OutputRoute const& output = pc.services().get<DeviceSpec const>().outputs[oi];
510 if ((output.timeslice % output.maxTimeslices) != 0) {
511 continue;
512 }
513 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
514 auto concrete = DataSpecUtils::asConcreteDataMatcher(output.matcher);
515 if (concrete.subSpec == 0) {
516 continue;
517 }
518 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(Output{concrete.origin, concrete.description, concrete.subSpec});
519 stfDist.id = timingInfo.timeslice;
520 stfDist.firstOrbit = timingInfo.firstTForbit;
521 stfDist.runNumber = timingInfo.runNumber;
522 // We mark it as not created, because we do should not account for it when
523 // checking if we created all the data for a timeslice.
524 O2_SIGNPOST_ID_FROM_POINTER(sid, stream_context, &streamContext);
525 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "finaliseOutputs", "Route %" PRIu64 " (%{public}s) was created by DPL.", (uint64_t)oi,
526 DataSpecUtils::describe(output.matcher).c_str());
527 streamContext.routeDPLCreated[oi] = true;
528 }
529 } },
530 .kind = ServiceKind::Global};
531}
536
537auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
538 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
539 auto& ref = task.user<DecongestionContext>().ref;
540
541 auto& decongestion = ref.get<DecongestionService>();
542 auto& proxy = ref.get<FairMQDeviceProxy>();
543
544 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
545 cid.value = id;
546 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Not sending already sent value: %" PRIu64 "> %" PRIu64,
548 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
549 return;
550 }
551 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Running oldest possible timeslice %" PRIu64 " propagation.",
552 (uint64_t)oldestPossibleOutput.timeslice.value);
553 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
554
555 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
556 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
557 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
558 // TODO: this we could cache in the proxy at the bind moment.
559 if (info.channelType != ChannelAccountingType::DPL) {
560 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
561 continue;
562 }
563 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value)) {
564 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
565 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
566 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
567 }
568 }
569 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
570};
571
572auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
573 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
574 auto& ref = task.user<DecongestionContext>().ref;
575
576 auto& decongestion = ref.get<DecongestionService>();
577 auto& state = ref.get<DeviceState>();
578 auto& timesliceIndex = ref.get<TimesliceIndex>();
579 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
580 int64_t oldNextTimeslice = decongestion.nextTimeslice;
581 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
582 if (oldNextTimeslice != decongestion.nextTimeslice) {
583 if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
584 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
585 } else {
586 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
587 }
588 timesliceIndex.rescan();
589 }
590};
591
592// Decongestion service
593// If we do not have any Timeframe input, it means we must be creating timeslices
594// in order and that we should propagate the oldest possible timeslice at the end
595// of each processing step.
598{
599 return ServiceSpec{
600 .name = "decongestion",
601 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
602 auto* decongestion = new DecongestionService();
603 for (auto& input : services.get<DeviceSpec const>().inputs) {
604 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
605 LOGP(detail, "Found a real data input, we cannot update the oldest possible timeslice when sending messages");
606 decongestion->isFirstInTopology = false;
607 break;
608 }
609 }
610 for (const auto& label : services.get<DeviceSpec const>().labels) {
612 decongestion->suppressDomainInfo = true;
613 break;
614 }
615 }
616 auto& queue = services.get<AsyncQueue>();
617 decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
618 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
619 },
620 .postForwarding = [](ProcessingContext& ctx, void* service) {
621 auto* decongestion = reinterpret_cast<DecongestionService*>(service);
622 if (O2_BUILTIN_LIKELY(decongestion->isFirstInTopology == false)) {
623 return;
624 }
625 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, service);
626 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "postForwardingCallbacks", "We are the first one in the topology, we need to update the oldest possible timeslice");
627 auto& timesliceIndex = ctx.services().get<TimesliceIndex>();
628 auto& relayer = ctx.services().get<DataRelayer>();
629 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
630 auto& proxy = ctx.services().get<FairMQDeviceProxy>();
631 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
632 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
633 LOGP(detail, "Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
634 return;
635 }
636
637 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
638 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
639 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
640 (uint64_t)oldestPossibleOutput.timeslice.value);
641 return;
642 }
643 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
644 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
645 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
646 return;
647 }
648
649 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Broadcasting oldest posssible output %" PRIu64 " due to %{public}s (%" PRIu64 ")",
650 (uint64_t)oldestPossibleOutput.timeslice.value,
651 oldestPossibleOutput.slot.index == -1 ? "channel" : "slot",
652 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
653 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Ordered active %d", decongestion->orderedCompletionPolicyActive);
654 if (decongestion->orderedCompletionPolicyActive) {
655 auto oldNextTimeslice = decongestion->nextTimeslice;
656 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
657 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Next timeslice %" PRIi64, decongestion->nextTimeslice);
658 if (oldNextTimeslice != decongestion->nextTimeslice) {
659 auto& state = ctx.services().get<DeviceState>();
661 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
662 } else {
663 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
664 }
665 timesliceIndex.rescan();
666 }
667 }
668 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ctx.services(), oldestPossibleOutput.timeslice.value);
669
670 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
671 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
672 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
673 // TODO: this we could cache in the proxy at the bind moment.
674 if (info.channelType != ChannelAccountingType::DPL) {
675 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
676 continue;
677 }
678 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ctx.services(), info, state, oldestPossibleOutput.timeslice.value)) {
679 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
680 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
681 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
682 }
683 }
684 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
685 .stop = [](ServiceRegistryRef services, void* service) {
686 auto* decongestion = (DecongestionService*)service;
687 services.get<TimesliceIndex>().reset();
688 decongestion->nextEnumerationTimeslice = 0;
689 decongestion->nextEnumerationTimesliceRewinded = false;
690 decongestion->lastTimeslice = 0;
691 decongestion->nextTimeslice = 0;
692 decongestion->oldestPossibleTimesliceTask = {0};
693 auto &state = services.get<DeviceState>();
694 for (auto &channel : state.inputChannelInfos) {
695 channel.oldestForChannel = {0};
696 } },
697 .domainInfoUpdated = [](ServiceRegistryRef services, size_t oldestPossibleTimeslice, ChannelIndex channel) {
698 auto& decongestion = services.get<DecongestionService>();
699 auto& relayer = services.get<DataRelayer>();
700 auto& timesliceIndex = services.get<TimesliceIndex>();
701 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, &decongestion);
702 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Received oldest possible timeframe %" PRIu64 " from channel %d",
703 (uint64_t)oldestPossibleTimeslice, channel.value);
704 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
705 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
706 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
707
708 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
709 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
710 return;
711 }
712 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
713 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
714 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
715 return;
716 }
717 auto& queue = services.get<AsyncQueue>();
718 const auto& state = services.get<DeviceState>();
721 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
722 (uint64_t)oldestPossibleOutput.timeslice.value);
724 queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
725 .id = decongestion.oldestPossibleTimesliceTask,
726 .debounce = -1, .callback = decongestionCallback}
727 .user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
728
729 if (decongestion.orderedCompletionPolicyActive) {
731 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
732 .callback = decongestionCallbackOrdered}
733 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
734 } },
735 .kind = ServiceKind::Serial};
736}
737
738// FIXME: allow configuring the default number of threads per device
739// This should probably be done by overriding the preFork
740// callback and using the boost program options there to
741// get the default number of threads.
743{
744 return ServiceSpec{
745 .name = "threadpool",
746 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
747 auto* pool = new ThreadPool();
748 // FIXME: this will require some extra argument for the configuration context of a service
749 pool->poolSize = 1;
750 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
751 },
752 .configure = [](InitContext&, void* service) -> void* {
753 auto* t = reinterpret_cast<ThreadPool*>(service);
754 // FIXME: this will require some extra argument for the configuration context of a service
755 t->poolSize = 1;
756 return service;
757 },
758 .postForkParent = [](ServiceRegistryRef services) -> void {
759 // FIXME: this will require some extra argument for the configuration context of a service
760 auto numWorkersS = std::to_string(1);
761 setenv("UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
762 },
763 .kind = ServiceKind::Serial};
764}
765
766namespace
767{
768auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
769{
770 // Update the timer to make sure we have the correct time when sending out the stats.
771 uv_update_time(registry.get<DeviceState>().loop);
772 // Derive the amount of shared memory used
773 auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
774 using namespace fair::mq::shmem;
775 auto& spec = registry.get<DeviceSpec const>();
776
777 // FIXME: Ugly, but we do it only every 5 seconds...
778 if (stats.hasAvailSHMMetric) {
779 auto device = registry.get<RawDeviceService>().device();
780 long freeMemory = -1;
781 try {
782 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
783 } catch (...) {
784 }
785 if (freeMemory == -1) {
786 try {
787 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
788 } catch (...) {
789 }
790 }
791 stats.updateStats({static_cast<unsigned short>(static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory});
792 }
793
794 auto device = registry.get<RawDeviceService>().device();
795
796 int64_t totalBytesIn = 0;
797 int64_t totalBytesOut = 0;
798
799 for (auto& channel : device->GetChannels()) {
800 totalBytesIn += channel.second[0].GetBytesRx();
801 totalBytesOut += channel.second[0].GetBytesTx();
802 }
803
804 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_IN), DataProcessingStats::Op::Set, totalBytesIn / 1000000});
805 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_OUT), DataProcessingStats::Op::Set, totalBytesOut / 1000000});
806
807 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_IN_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesIn / 1000000});
808 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
809};
810
811auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
812{
813 if (!registry.get<DriverConfig const>().driverHasGUI) {
814 return;
815 }
816 states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
817 auto& client = registry.get<ControlService>();
818 client.push(spec, value, timestamp);
819 });
820}
821
822O2_DECLARE_DYNAMIC_LOG(monitoring_service);
823
825auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
826{
827 // Flushing metrics should only happen on main thread to avoid
828 // having to have a mutex for the communication with the driver.
829 O2_SIGNPOST_ID_GENERATE(sid, monitoring_service);
830 O2_SIGNPOST_START(monitoring_service, sid, "flush", "flushing metrics");
831 if (registry.isMainThread() == false) {
832 LOGP(fatal, "Flushing metrics should only happen on the main thread.");
833 }
834 auto& monitoring = registry.get<Monitoring>();
835 auto& relayer = registry.get<DataRelayer>();
836
837 // Send all the relevant metrics for the relayer to update the GUI
838 stats.flushChangedMetrics([&monitoring, sid](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) mutable -> void {
839 // convert timestamp to a time_point
840 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
841 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
842 if (spec.kind == DataProcessingStats::Kind::UInt64) {
843 if (value < 0) {
844 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is negative, setting to 0",
845 spec.name.c_str());
846 value = 0;
847 }
848 metric.addValue((uint64_t)value, "value");
849 } else {
850 if (value > (int64_t)std::numeric_limits<int>::max()) {
851 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is too large, setting to INT_MAX",
852 spec.name.c_str());
853 value = (int64_t)std::numeric_limits<int>::max();
854 }
855 if (value < (int64_t)std::numeric_limits<int>::min()) {
856 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is too small, setting to INT_MIN",
857 spec.name.c_str());
858 value = (int64_t)std::numeric_limits<int>::min();
859 }
860 metric.addValue((int)value, "value");
861 }
862 if (spec.scope == DataProcessingStats::Scope::DPL) {
863 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
864 }
865 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Flushing metric %{public}s", spec.name.c_str());
866 monitoring.send(std::move(metric));
867 });
868 relayer.sendContextState();
869 monitoring.flushBuffer();
870 O2_SIGNPOST_END(monitoring_service, sid, "flush", "done flushing metrics");
871};
872} // namespace
873
875{
876 return ServiceSpec{
877 .name = "data-processing-stats",
878 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
879 timespec now;
880 clock_gettime(CLOCK_REALTIME, &now);
881 uv_update_time(state.loop);
882 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
884 .minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
887 config);
888 auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
889
890 // It makes no sense to update the stats more often than every 5s
891 int quickUpdateInterval = 5000;
892 uint64_t quickRefreshInterval = 7000;
893 uint64_t onlineRefreshLatency = 60000; // For metrics which are reported online, we flush them every 60s regardless of their state.
894 using MetricSpec = DataProcessingStats::MetricSpec;
895 using Kind = DataProcessingStats::Kind;
896 using Scope = DataProcessingStats::Scope;
897
898#ifdef NDEBUG
899 bool enableDebugMetrics = false;
900#else
901 bool enableDebugMetrics = true;
902#endif
903 bool arrowAndResourceLimitingMetrics = false;
905 arrowAndResourceLimitingMetrics = true;
906 }
907
908 int64_t consumedTimeframesPublishInterval = 0;
910 consumedTimeframesPublishInterval = 5000;
911 }
912 // Input proxies should not report cpu_usage_fraction,
913 // because of the rate limiting which biases the measurement.
914 auto& spec = services.get<DeviceSpec const>();
915 bool enableCPUUsageFraction = true;
916 auto isProxy = [](DataProcessorLabel const& label) -> bool { return label == DataProcessorLabel{"input-proxy"}; };
917 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
918 O2_SIGNPOST_ID_GENERATE(mid, policies);
919 O2_SIGNPOST_EVENT_EMIT(policies, mid, "metrics", "Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
920 enableCPUUsageFraction = false;
921 }
922
923 std::vector<DataProcessingStats::MetricSpec> metrics = {
924 MetricSpec{.name = "errors",
926 .kind = Kind::UInt64,
927 .scope = Scope::Online,
928 .minPublishInterval = quickUpdateInterval,
929 .maxRefreshLatency = quickRefreshInterval},
930 MetricSpec{.name = "exceptions",
932 .kind = Kind::UInt64,
933 .scope = Scope::Online,
934 .minPublishInterval = quickUpdateInterval},
935 MetricSpec{.name = "inputs/relayed/pending",
937 .kind = Kind::UInt64,
938 .minPublishInterval = quickUpdateInterval},
939 MetricSpec{.name = "inputs/relayed/incomplete",
941 .kind = Kind::UInt64,
942 .minPublishInterval = quickUpdateInterval},
943 MetricSpec{.name = "inputs/relayed/total",
945 .kind = Kind::UInt64,
946 .minPublishInterval = quickUpdateInterval},
947 MetricSpec{.name = "elapsed_time_ms",
949 .kind = Kind::UInt64,
950 .minPublishInterval = quickUpdateInterval},
951 MetricSpec{.name = "total_wall_time_ms",
953 .kind = Kind::UInt64,
954 .minPublishInterval = quickUpdateInterval},
955 MetricSpec{.name = "last_processed_input_size_byte",
957 .kind = Kind::UInt64,
958 .minPublishInterval = quickUpdateInterval},
959 MetricSpec{.name = "total_processed_input_size_byte",
961 .kind = Kind::UInt64,
962 .scope = Scope::Online,
963 .minPublishInterval = quickUpdateInterval},
964 MetricSpec{.name = "total_sigusr1",
966 .kind = Kind::UInt64,
967 .minPublishInterval = quickUpdateInterval},
968 MetricSpec{.name = "consumed-timeframes",
970 .kind = Kind::UInt64,
971 .minPublishInterval = consumedTimeframesPublishInterval,
972 .maxRefreshLatency = quickRefreshInterval,
973 .sendInitialValue = true},
974 MetricSpec{.name = "min_input_latency_ms",
976 .kind = Kind::UInt64,
977 .scope = Scope::Online,
978 .minPublishInterval = quickUpdateInterval},
979 MetricSpec{.name = "max_input_latency_ms",
981 .kind = Kind::UInt64,
982 .minPublishInterval = quickUpdateInterval},
983 MetricSpec{.name = "total_rate_in_mb_s",
985 .kind = Kind::Rate,
986 .scope = Scope::Online,
987 .minPublishInterval = quickUpdateInterval,
988 .maxRefreshLatency = onlineRefreshLatency,
989 .sendInitialValue = true},
990 MetricSpec{.name = "total_rate_out_mb_s",
992 .kind = Kind::Rate,
993 .scope = Scope::Online,
994 .minPublishInterval = quickUpdateInterval,
995 .maxRefreshLatency = onlineRefreshLatency,
996 .sendInitialValue = true},
997 MetricSpec{.name = "processing_rate_hz",
999 .kind = Kind::Rate,
1000 .scope = Scope::Online,
1001 .minPublishInterval = quickUpdateInterval,
1002 .maxRefreshLatency = onlineRefreshLatency,
1003 .sendInitialValue = true},
1004 MetricSpec{.name = "cpu_usage_fraction",
1005 .enabled = enableCPUUsageFraction,
1007 .kind = Kind::Rate,
1008 .scope = Scope::Online,
1009 .minPublishInterval = quickUpdateInterval,
1010 .maxRefreshLatency = onlineRefreshLatency,
1011 .sendInitialValue = true},
1012 MetricSpec{.name = "performed_computations",
1014 .kind = Kind::UInt64,
1015 .scope = Scope::Online,
1016 .minPublishInterval = quickUpdateInterval,
1017 .maxRefreshLatency = onlineRefreshLatency,
1018 .sendInitialValue = true},
1019 MetricSpec{.name = "total_bytes_in",
1021 .kind = Kind::UInt64,
1022 .scope = Scope::Online,
1023 .minPublishInterval = quickUpdateInterval,
1024 .maxRefreshLatency = onlineRefreshLatency,
1025 .sendInitialValue = true},
1026 MetricSpec{.name = "total_bytes_out",
1028 .kind = Kind::UInt64,
1029 .scope = Scope::Online,
1030 .minPublishInterval = quickUpdateInterval,
1031 .maxRefreshLatency = onlineRefreshLatency,
1032 .sendInitialValue = true},
1033 MetricSpec{.name = fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId),
1034 .metricId = (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512),
1035 .kind = Kind::UInt64,
1036 .scope = Scope::Online,
1037 .minPublishInterval = 500,
1038 .maxRefreshLatency = onlineRefreshLatency,
1039 .sendInitialValue = true},
1040 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1041 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1042 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1043 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1044 MetricSpec{.name = "arrow-bytes-destroyed",
1045 .enabled = arrowAndResourceLimitingMetrics,
1046 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED),
1047 .kind = Kind::UInt64,
1048 .scope = Scope::DPL,
1049 .minPublishInterval = 0,
1050 .maxRefreshLatency = 10000,
1051 .sendInitialValue = true},
1052 MetricSpec{.name = "arrow-messages-destroyed",
1053 .enabled = arrowAndResourceLimitingMetrics,
1054 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED),
1055 .kind = Kind::UInt64,
1056 .scope = Scope::DPL,
1057 .minPublishInterval = 0,
1058 .maxRefreshLatency = 10000,
1059 .sendInitialValue = true},
1060 MetricSpec{.name = "arrow-bytes-created",
1061 .enabled = arrowAndResourceLimitingMetrics,
1062 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED),
1063 .kind = Kind::UInt64,
1064 .scope = Scope::DPL,
1065 .minPublishInterval = 0,
1066 .maxRefreshLatency = 10000,
1067 .sendInitialValue = true},
1068 MetricSpec{.name = "arrow-messages-created",
1069 .enabled = arrowAndResourceLimitingMetrics,
1070 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED),
1071 .kind = Kind::UInt64,
1072 .scope = Scope::DPL,
1073 .minPublishInterval = 0,
1074 .maxRefreshLatency = 10000,
1075 .sendInitialValue = true},
1076 MetricSpec{.name = "arrow-bytes-expired",
1077 .enabled = arrowAndResourceLimitingMetrics,
1078 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED),
1079 .kind = Kind::UInt64,
1080 .scope = Scope::DPL,
1081 .minPublishInterval = 0,
1082 .maxRefreshLatency = 10000,
1083 .sendInitialValue = true},
1084 MetricSpec{.name = "shm-offer-bytes-consumed",
1085 .enabled = arrowAndResourceLimitingMetrics,
1086 .metricId = static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED),
1087 .kind = Kind::UInt64,
1088 .scope = Scope::DPL,
1089 .minPublishInterval = 0,
1090 .maxRefreshLatency = 10000,
1091 .sendInitialValue = true},
1092 MetricSpec{.name = "timeslice-offer-number-consumed",
1093 .enabled = arrowAndResourceLimitingMetrics,
1094 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED),
1095 .kind = Kind::UInt64,
1096 .scope = Scope::DPL,
1097 .minPublishInterval = 0,
1098 .maxRefreshLatency = 10000,
1099 .sendInitialValue = true},
1100 MetricSpec{.name = "timeslices-expired",
1101 .enabled = arrowAndResourceLimitingMetrics,
1102 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED),
1103 .kind = Kind::UInt64,
1104 .scope = Scope::DPL,
1105 .minPublishInterval = 0,
1106 .maxRefreshLatency = 10000,
1107 .sendInitialValue = true},
1108 MetricSpec{.name = "timeslices-started",
1109 .enabled = arrowAndResourceLimitingMetrics,
1110 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_STARTED),
1111 .kind = Kind::UInt64,
1112 .scope = Scope::DPL,
1113 .minPublishInterval = 0,
1114 .maxRefreshLatency = 10000,
1115 .sendInitialValue = true},
1116 MetricSpec{.name = "timeslices-done",
1117 .enabled = arrowAndResourceLimitingMetrics,
1118 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_DONE),
1119 .kind = Kind::UInt64,
1120 .scope = Scope::DPL,
1121 .minPublishInterval = 0,
1122 .maxRefreshLatency = 10000,
1123 .sendInitialValue = true},
1124 MetricSpec{.name = "resources-missing",
1125 .enabled = enableDebugMetrics,
1126 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
1127 .kind = Kind::UInt64,
1128 .scope = Scope::DPL,
1129 .minPublishInterval = 1000,
1130 .maxRefreshLatency = 1000,
1131 .sendInitialValue = true},
1132 MetricSpec{.name = "resources-insufficient",
1133 .enabled = enableDebugMetrics,
1134 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
1135 .kind = Kind::UInt64,
1136 .scope = Scope::DPL,
1137 .minPublishInterval = 1000,
1138 .maxRefreshLatency = 1000,
1139 .sendInitialValue = true},
1140 MetricSpec{.name = "resources-satisfactory",
1141 .enabled = enableDebugMetrics,
1142 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
1143 .kind = Kind::UInt64,
1144 .scope = Scope::DPL,
1145 .minPublishInterval = 1000,
1146 .maxRefreshLatency = 1000,
1147 .sendInitialValue = true},
1148 MetricSpec{.name = "resource-offer-expired",
1149 .enabled = arrowAndResourceLimitingMetrics,
1150 .metricId = static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED),
1151 .kind = Kind::UInt64,
1152 .scope = Scope::DPL,
1153 .minPublishInterval = 0,
1154 .maxRefreshLatency = 10000,
1155 .sendInitialValue = true},
1156 MetricSpec{.name = "ccdb-cache-hit",
1157 .enabled = true,
1158 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_HIT),
1159 .kind = Kind::UInt64,
1160 .scope = Scope::DPL,
1161 .minPublishInterval = 1000,
1162 .maxRefreshLatency = 10000,
1163 .sendInitialValue = true},
1164 MetricSpec{.name = "ccdb-cache-miss",
1165 .enabled = true,
1166 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_MISS),
1167 .kind = Kind::UInt64,
1168 .scope = Scope::DPL,
1169 .minPublishInterval = 1000,
1170 .maxRefreshLatency = 10000,
1171 .sendInitialValue = true},
1172 MetricSpec{.name = "ccdb-cache-failure",
1173 .enabled = true,
1174 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_FAILURE),
1175 .kind = Kind::UInt64,
1176 .scope = Scope::DPL,
1177 .minPublishInterval = 1000,
1178 .maxRefreshLatency = 10000,
1179 .sendInitialValue = true}};
1180
1181 for (auto& metric : metrics) {
1182 if (metric.metricId == (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512)) {
1183 if (spec.name.compare("readout-proxy") == 0) {
1184 stats->hasAvailSHMMetric = true;
1185 } else {
1186 continue;
1187 }
1188 }
1189 stats->registerMetric(metric);
1190 }
1191
1192 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1193 },
1194 .configure = noConfiguration(),
1195 .postProcessing = [](ProcessingContext& context, void* service) {
1196 auto* stats = (DataProcessingStats*)service;
1198 .preDangling = [](DanglingContext& context, void* service) {
1199 auto* stats = (DataProcessingStats*)service;
1200 sendRelayerMetrics(context.services(), *stats);
1201 flushMetrics(context.services(), *stats); },
1202 .postDangling = [](DanglingContext& context, void* service) {
1203 auto* stats = (DataProcessingStats*)service;
1204 sendRelayerMetrics(context.services(), *stats);
1205 flushMetrics(context.services(), *stats); },
1206 .preEOS = [](EndOfStreamContext& context, void* service) {
1207 auto* stats = (DataProcessingStats*)service;
1208 sendRelayerMetrics(context.services(), *stats);
1209 flushMetrics(context.services(), *stats); },
1210 .preLoop = [](ServiceRegistryRef ref, void* service) {
1211 auto* stats = (DataProcessingStats*)service;
1212 flushMetrics(ref, *stats); },
1213 .kind = ServiceKind::Serial};
1214}
1215
1216// This is similar to the dataProcessingStats, but it designed to synchronize
1217// history-less metrics which are e.g. used for the GUI.
1219{
1220 return ServiceSpec{
1221 .name = "data-processing-states",
1222 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
1223 timespec now;
1224 clock_gettime(CLOCK_REALTIME, &now);
1225 uv_update_time(state.loop);
1226 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
1229 states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
1230 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
1231 },
1232 .configure = noConfiguration(),
1233 .postProcessing = [](ProcessingContext& context, void* service) {
1234 auto* states = (DataProcessingStates*)service;
1235 states->processCommandQueue(); },
1236 .preDangling = [](DanglingContext& context, void* service) {
1237 auto* states = (DataProcessingStates*)service;
1238 flushStates(context.services(), *states); },
1239 .postDangling = [](DanglingContext& context, void* service) {
1240 auto* states = (DataProcessingStates*)service;
1241 flushStates(context.services(), *states); },
1242 .preEOS = [](EndOfStreamContext& context, void* service) {
1243 auto* states = (DataProcessingStates*)service;
1244 flushStates(context.services(), *states); },
1245 .kind = ServiceKind::Global};
1246}
1247
1249};
1250
1252{
1253 return ServiceSpec{
1254 .name = "gui-metrics",
1255 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
1256 auto* stats = new GUIMetrics();
1257 auto& monitoring = services.get<Monitoring>();
1258 auto& spec = services.get<DeviceSpec const>();
1259 monitoring.send({(int)spec.inputChannels.size(), fmt::format("oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1260 monitoring.send({(int)1, fmt::format("oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1261 monitoring.send({(int)spec.outputChannels.size(), fmt::format("oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1262 monitoring.send({(int)1, fmt::format("oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1263 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1264 },
1265 .configure = noConfiguration(),
1266 .postProcessing = [](ProcessingContext& context, void* service) {
1267 auto& relayer = context.services().get<DataRelayer>();
1268 auto& monitoring = context.services().get<Monitoring>();
1269 auto& spec = context.services().get<DeviceSpec const>();
1270 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1271 for (size_t ci; ci < spec.outputChannels.size(); ++ci) {
1272 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format("oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1273 } },
1274 .domainInfoUpdated = [](ServiceRegistryRef registry, size_t timeslice, ChannelIndex channel) {
1275 auto& monitoring = registry.get<Monitoring>();
1276 monitoring.send({(uint64_t)timeslice, fmt::format("oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1277 .active = false,
1278 .kind = ServiceKind::Serial};
1279}
1280
1282{
1283 return ServiceSpec{
1284 .name = "object-cache",
1285 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1286 auto* cache = new ObjectCache();
1287 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1288 },
1289 .configure = noConfiguration(),
1291}
1292
1294{
1295 return ServiceSpec{
1296 .name = "data-processing-context",
1297 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1298 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessorContext>(), new DataProcessorContext()};
1299 },
1300 .configure = noConfiguration(),
1301 .exit = [](ServiceRegistryRef, void* service) { auto* context = (DataProcessorContext*)service; delete context; },
1302 .kind = ServiceKind::Serial};
1303}
1304
1306{
1307 return ServiceSpec{
1308 .name = "data-allocator",
1309 .uniqueId = simpleServiceId<DataAllocator>(),
1310 .init = [](ServiceRegistryRef ref, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1311 return ServiceHandle{
1312 .hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1313 .instance = new DataAllocator(ref),
1314 .kind = ServiceKind::Stream,
1315 .name = "data-allocator",
1316 };
1317 },
1318 .configure = noConfiguration(),
1319 .kind = ServiceKind::Stream};
1320}
1321
1323std::vector<ServiceSpec> CommonServices::defaultServices(std::string extraPlugins, int numThreads)
1324{
1325 std::vector<ServiceSpec> specs{
1329 asyncQueue(),
1336 controlSpec(),
1337 rootFileSpec(),
1338 parallelSpec(),
1339 callbacksSpec(),
1342 dataRelayer(),
1344 dataSender(),
1345 objectCache(),
1346 ccdbSupportSpec()};
1347
1349 specs.push_back(ArrowSupport::arrowBackendSpec());
1350 }
1353 specs.push_back(decongestionSpec());
1354
1355 std::string loadableServicesStr = extraPlugins;
1356 // Do not load InfoLogger by default if we are not at P2.
1358 if (loadableServicesStr.empty() == false) {
1359 loadableServicesStr += ",";
1360 }
1361 loadableServicesStr += "O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1362 }
1363 // Load plugins depending on the environment
1364 std::vector<LoadablePlugin> loadablePlugins = {};
1365 char* loadableServicesEnv = getenv("DPL_LOAD_SERVICES");
1366 // String to define the services to load is:
1367 //
1368 // library1:name1,library2:name2,...
1369 if (loadableServicesEnv) {
1370 if (loadableServicesStr.empty() == false) {
1371 loadableServicesStr += ",";
1372 }
1373 loadableServicesStr += loadableServicesEnv;
1374 }
1375 loadablePlugins = PluginManager::parsePluginSpecString(loadableServicesStr.c_str());
1376 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
1377 // I should make it optional depending wether the GUI is there or not...
1378 specs.push_back(CommonServices::guiMetricsSpec());
1379 if (numThreads) {
1380 specs.push_back(threadPool(numThreads));
1381 }
1382 return specs;
1383}
1384
1385std::vector<ServiceSpec> CommonServices::arrowServices()
1386{
1387 return {
1390 };
1391}
1392
1393} // namespace o2::framework
std::vector< std::string > labels
benchmark::State & state
std::vector< OutputRoute > routes
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
std::ostringstream debug
int16_t time
Definition RawEventData.h:4
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
Definition Signpost.h:573
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
Definition DataSender.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint id
Definition glcorearb.h:650
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
const DataProcessorLabel suppressDomainInfoLabel
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
@ NoTransition
No pending transitions.
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
Definition AsyncQueue.h:32
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec controlSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
Running state information of a given device.
Definition DeviceState.h:34
static size_t maxLanes(std::vector< InputRoute > const &routes)
header::DataOrigin origin
Definition Output.h:28
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"