Project
Loading...
Searching...
No Matches
CommonServices.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
25#include "Framework/Signpost.h"
32#include "InputRouteHelpers.h"
36#include "Framework/Tracing.h"
46#include "Framework/Signpost.h"
49
50#include "TextDriverClient.h"
51#include "WSDriverClient.h"
52#include "HTTPParser.h"
53#include "../src/DataProcessingStatus.h"
54#include "DecongestionService.h"
55#include "ArrowSupport.h"
57#include "Headers/STFHeader.h"
58#include "Headers/DataHeader.h"
59
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/MonitoringFactory.h>
63#include "Framework/Signpost.h"
64
65#include <fairmq/Device.h>
66#include <fairmq/shmem/Monitor.h>
67#include <fairmq/shmem/Common.h>
68#include <fairmq/ProgOptions.h>
69#include <uv.h>
70
71#include <cstdlib>
72#include <cstring>
73
74using o2::configuration::ConfigurationFactory;
75using o2::configuration::ConfigurationInterface;
76using o2::monitoring::Monitoring;
77using o2::monitoring::MonitoringFactory;
78using Metric = o2::monitoring::Metric;
79using Key = o2::monitoring::tags::Key;
80using Value = o2::monitoring::tags::Value;
81
82O2_DECLARE_DYNAMIC_LOG(data_processor_context);
83O2_DECLARE_DYNAMIC_LOG(stream_context);
86
87namespace o2::framework
88{
89
90#define MONITORING_QUEUE_SIZE 100
92{
93 return ServiceSpec{
94 .name = "monitoring",
95 .init = [](ServiceRegistryRef registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
96 void* service = nullptr;
97 bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
98 bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
99 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
100 o2::monitoring::Monitoring* monitoring;
101 if (useDPL) {
102 monitoring = new Monitoring();
103 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
104 (dynamic_cast<o2::monitoring::Backend*>(dplBackend.get()))->setVerbosity(o2::monitoring::Verbosity::Debug);
105 monitoring->addBackend(std::move(dplBackend));
106 } else {
107 auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
108 monitoring = MonitoringFactory::Get(backend).release();
109 }
110 service = monitoring;
111 monitoring->enableBuffering(MONITORING_QUEUE_SIZE);
112 assert(registry.get<DeviceSpec const>().name.empty() == false);
113 monitoring->addGlobalTag("pipeline_id", std::to_string(registry.get<DeviceSpec const>().inputTimesliceId));
114 monitoring->addGlobalTag("dataprocessor_name", registry.get<DeviceSpec const>().name);
115 monitoring->addGlobalTag("dpl_instance", options.GetPropertyAsString("shm-segment-id"));
116 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
117 },
118 .configure = noConfiguration(),
119 .start = [](ServiceRegistryRef services, void* service) {
120 auto* monitoring = (o2::monitoring::Monitoring*)service;
121
122 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
123 if (extRunNumber == "unspecified") {
124 return;
125 }
126 try {
127 monitoring->setRunNumber(std::stoul(extRunNumber));
128 } catch (...) {
129 } },
130 .exit = [](ServiceRegistryRef registry, void* service) {
131 auto* monitoring = reinterpret_cast<Monitoring*>(service);
132 monitoring->flushBuffer();
133 delete monitoring; },
134 .kind = ServiceKind::Serial};
135}
136
137// An asyncronous service that executes actions in at the end of the data processing
139{
140 return ServiceSpec{
141 .name = "async-queue",
142 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
143 .configure = noConfiguration(),
144 .stop = [](ServiceRegistryRef services, void* service) {
145 auto& queue = services.get<AsyncQueue>();
147 },
148 .kind = ServiceKind::Serial};
149}
150
151// Make it a service so that it can be used easily from the analysis
152// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
153// basis when we get to it.
155{
156 return ServiceSpec{
157 .name = "timing-info",
158 .uniqueId = simpleServiceId<TimingInfo>(),
159 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
160 .configure = noConfiguration(),
161 .kind = ServiceKind::Stream};
162}
163
165{
166 return ServiceSpec{
167 .name = "stream-context",
168 .uniqueId = simpleServiceId<StreamContext>(),
169 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
170 .configure = noConfiguration(),
171 .preProcessing = [](ProcessingContext& context, void* service) {
172 auto* stream = (StreamContext*)service;
173 auto& routes = context.services().get<DeviceSpec const>().outputs;
174 // Notice I need to do this here, because different invocation for
175 // the same stream might be referring to different data processors.
176 // We should probably have a context which is per stream of a specific
177 // data processor.
178 stream->routeDPLCreated.resize(routes.size());
179 stream->routeCreated.resize(routes.size());
180 // Reset the routeDPLCreated at every processing step
181 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false);
182 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
183 .postProcessing = [](ProcessingContext& processingContext, void* service) {
184 auto* stream = (StreamContext*)service;
185 auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
186 auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
187 auto& messageContext = processingContext.services().get<MessageContext>();
188 // Check if we never created any data for this timeslice
189 // if we did not, but we still have didDispatched set to true
190 // it means it was created out of band.
191 bool userDidCreate = false;
192 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
193 for (size_t ri = 0; ri < routes.size(); ++ri) {
194 if (stream->routeCreated[ri] == true && stream->routeDPLCreated[ri] == false) {
195 userDidCreate = true;
196 break;
197 }
198 }
199 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "userDidCreate == %d && didDispatch == %d",
200 userDidCreate,
201 messageContext.didDispatch());
202 if (userDidCreate == false && messageContext.didDispatch() == true) {
203 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
204 userDidCreate,
205 messageContext.didDispatch());
206 return;
207 }
208 if (userDidCreate == false && messageContext.didDispatch() == false) {
209 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
210 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "No data created.");
211 return;
212 }
213 for (size_t ri = 0; ri < routes.size(); ++ri) {
214 auto &route = routes[ri];
215 auto &matcher = route.matcher;
216 if (stream->routeDPLCreated[ri] == true) {
217 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created by DPL. ri = %" PRIu64 ", %{public}s",
218 (uint64_t)ri, DataSpecUtils::describe(matcher).c_str());
219 continue;
220 }
221 if (stream->routeCreated[ri] == true) {
222 continue;
223 } if ((timeslice % route.maxTimeslices) != route.timeslice) {
224 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Route ri = %" PRIu64 ", skipped because of pipelining.",
225 (uint64_t)ri);
226 continue;
227 }
228 if (matcher.lifetime == Lifetime::Timeframe) {
229 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks",
230 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64 " and might result in dropped timeframes",
231 DataSpecUtils::describe(matcher).c_str(), (uint64_t)timeslice);
232 LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
233 }
234 } },
235 .preEOS = [](EndOfStreamContext& context, void* service) {
236 // We need to reset the routeDPLCreated / routeCreated because the end of stream
237 // uses a different context which does not know about the routes.
238 // FIXME: This should be fixed in a different way, but for now it will
239 // allow TPC IDC to work.
240 auto* stream = (StreamContext*)service;
241 auto& routes = context.services().get<DeviceSpec const>().outputs;
242 // Notice I need to do this here, because different invocation for
243 // the same stream might be referring to different data processors.
244 // We should probably have a context which is per stream of a specific
245 // data processor.
246 stream->routeDPLCreated.resize(routes.size());
247 stream->routeCreated.resize(routes.size());
248 // Reset the routeCreated / routeDPLCreated at every processing step
249 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false);
250 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false); },
251 .kind = ServiceKind::Stream};
252}
253
255{
256 return ServiceSpec{
257 .name = "datataking-contex",
258 .uniqueId = simpleServiceId<DataTakingContext>(),
259 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
260 .configure = noConfiguration(),
261 .preProcessing = [](ProcessingContext& processingContext, void* service) {
262 auto& context = processingContext.services().get<DataTakingContext>();
263 for (auto const& ref : processingContext.inputs()) {
264 const o2::framework::DataProcessingHeader *dph = o2::header::get<DataProcessingHeader*>(ref.header);
265 const auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
266 if (!dph || !dh) {
267 continue;
268 }
269 context.runNumber = fmt::format("{}", dh->runNumber);
270 break;
271 } },
272 // Notice this will be executed only once, because the service is declared upfront.
273 .start = [](ServiceRegistryRef services, void* service) {
274 auto& context = services.get<DataTakingContext>();
275
277
278 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
279 if (extRunNumber != "unspecified" || context.runNumber == "0") {
280 context.runNumber = extRunNumber;
281 }
282 auto extLHCPeriod = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhc_period", "unspecified");
283 if (extLHCPeriod != "unspecified") {
284 context.lhcPeriod = extLHCPeriod;
285 } else {
286 static const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
287 time_t now = time(nullptr);
288 auto ltm = gmtime(&now);
289 context.lhcPeriod = months[ltm->tm_mon];
290 LOG(info) << "LHCPeriod is not available, using current month " << context.lhcPeriod;
291 }
292
293 auto extRunType = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("run_type", "unspecified");
294 if (extRunType != "unspecified") {
295 context.runType = extRunType;
296 }
297 auto extEnvId = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", "unspecified");
298 if (extEnvId != "unspecified") {
299 context.envId = extEnvId;
300 }
301 auto extDetectors = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("detectors", "unspecified");
302 if (extDetectors != "unspecified") {
303 context.detectors = extDetectors;
304 }
305 auto forcedRaw = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("force_run_as_raw", "false");
306 context.forcedRaw = forcedRaw == "true"; },
307 .kind = ServiceKind::Stream};
308}
309
311};
312
314{
315 return ServiceSpec{
316 .name = "configuration",
317 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
318 auto backend = options.GetPropertyAsString("configuration");
319 if (backend == "command-line") {
320 return ServiceHandle{0, nullptr};
321 }
322 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
323 ConfigurationFactory::getConfiguration(backend).release()};
324 },
325 .configure = noConfiguration(),
326 .driverStartup = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
327 if (dc.options.count("configuration") == 0) {
328 registry.registerService(ServiceHandle{0, nullptr});
329 return;
330 }
331 auto backend = dc.options["configuration"].as<std::string>();
332 registry.registerService(ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
333 ConfigurationFactory::getConfiguration(backend).release()}); },
334 .kind = ServiceKind::Global};
335}
336
338{
339 return ServiceSpec{
340 .name = "driverClient",
341 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
342 auto backend = options.GetPropertyAsString("driver-client-backend");
343 if (backend == "stdout://") {
344 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
345 new TextDriverClient(services, state)};
346 }
347 auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
348 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
349 new WSDriverClient(services, ip.c_str(), port)};
350 },
351 .configure = noConfiguration(),
352 .kind = ServiceKind::Global};
353}
354
356{
357 return ServiceSpec{
358 .name = "control",
359 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
360 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
361 new ControlService(services, state)};
362 },
363 .configure = noConfiguration(),
364 .kind = ServiceKind::Serial};
365}
366
368{
369 return ServiceSpec{
370 .name = "localrootfile",
371 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
372 .configure = noConfiguration(),
373 .kind = ServiceKind::Serial};
374}
375
377{
378 return ServiceSpec{
379 .name = "parallel",
380 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
381 auto& spec = services.get<DeviceSpec const>();
382 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
383 new ParallelContext(spec.rank, spec.nSlots)};
384 },
385 .configure = noConfiguration(),
386 .kind = ServiceKind::Serial};
387}
388
390{
391 return ServiceSpec{
392 .name = "timesliceindex",
393 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
394 auto& spec = services.get<DeviceSpec const>();
395 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
396 new TimesliceIndex(InputRouteHelpers::maxLanes(spec.inputs), state.inputChannelInfos)};
397 },
398 .configure = noConfiguration(),
399 .kind = ServiceKind::Serial};
400}
401
403{
404 return ServiceSpec{
405 .name = "callbacks",
406 .init = simpleServiceInit<CallbackService, CallbackService>(),
407 .configure = noConfiguration(),
408 .kind = ServiceKind::Serial};
409}
410
412{
413 return ServiceSpec{
414 .name = "datarelayer",
415 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
416 auto& spec = services.get<DeviceSpec const>();
417 return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
418 new DataRelayer(spec.completionPolicy,
419 spec.inputs,
420 services.get<TimesliceIndex>(),
421 services)};
422 },
423 .configure = noConfiguration(),
424 .kind = ServiceKind::Serial};
425}
426
428{
429 return ServiceSpec{
430 .name = "datasender",
431 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
432 return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
433 new DataSender(services)};
434 },
435 .configure = noConfiguration(),
436 .preProcessing = [](ProcessingContext&, void* service) {
437 auto& dataSender = *reinterpret_cast<DataSender*>(service);
438 dataSender.reset(); },
439 .postDispatching = [](ProcessingContext& ctx, void* service) {
440 auto& dataSender = *reinterpret_cast<DataSender*>(service);
441 // If the quit was requested, the post dispatching can still happen
442 // but with an empty set of data.
443 if (ctx.services().get<DeviceState>().quitRequested == false) {
444 dataSender.verifyMissingSporadic();
445 } },
447}
448
452
454{
455 return ServiceSpec{
456 .name = "tracing",
457 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
458 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<TracingInfrastructure>(),
459 .instance = new TracingInfrastructure(),
460 .kind = ServiceKind::Serial};
461 },
462 .configure = noConfiguration(),
463 .preProcessing = [](ProcessingContext&, void* service) {
464 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
465 t->processingCount += 1; },
466 .postProcessing = [](ProcessingContext&, void* service) {
467 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
468 t->processingCount += 1; },
469 .kind = ServiceKind::Serial};
470}
471
473};
474
475// CCDB Support service
477{
478 return ServiceSpec{
479 .name = "ccdb-support",
480 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
481 // iterate on all the outputs matchers
482 auto& spec = services.get<DeviceSpec const>();
483 for (auto& output : spec.outputs) {
484 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
485 LOGP(debug, "Optional inputs support enabled");
486 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = new CCDBSupport, .kind = ServiceKind::Serial};
487 }
488 }
489 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = nullptr, .kind = ServiceKind::Serial};
490 },
491 .configure = noConfiguration(),
492 .finaliseOutputs = [](ProcessingContext& pc, void* service) {
493 if (!service) {
494 return;
495 }
496 if (pc.outputs().countDeviceOutputs(true) == 0) {
497 LOGP(debug, "We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
498 return;
499 }
500 auto& timingInfo = pc.services().get<TimingInfo>();
501
502 // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
503 // we create a new message.
504 InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
505 auto& streamContext = pc.services().get<StreamContext>();
506 for (size_t oi = 0; oi < pc.services().get<DeviceSpec const>().outputs.size(); ++oi) {
507 OutputRoute const& output = pc.services().get<DeviceSpec const>().outputs[oi];
508 if ((output.timeslice % output.maxTimeslices) != 0) {
509 continue;
510 }
511 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
512 auto concrete = DataSpecUtils::asConcreteDataMatcher(output.matcher);
513 if (concrete.subSpec == 0) {
514 continue;
515 }
516 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(Output{concrete.origin, concrete.description, concrete.subSpec});
517 stfDist.id = timingInfo.timeslice;
518 stfDist.firstOrbit = timingInfo.firstTForbit;
519 stfDist.runNumber = timingInfo.runNumber;
520 // We mark it as not created, because we do should not account for it when
521 // checking if we created all the data for a timeslice.
522 O2_SIGNPOST_ID_FROM_POINTER(sid, stream_context, &streamContext);
523 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "finaliseOutputs", "Route %" PRIu64 " (%{public}s) was created by DPL.", (uint64_t)oi,
524 DataSpecUtils::describe(output.matcher).c_str());
525 streamContext.routeDPLCreated[oi] = true;
526 }
527 } },
528 .kind = ServiceKind::Global};
529}
534
535auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
536 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
537 auto& ref = task.user<DecongestionContext>().ref;
538
539 auto& decongestion = ref.get<DecongestionService>();
540 auto& proxy = ref.get<FairMQDeviceProxy>();
541
542 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
543 cid.value = id;
544 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
545 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Not sending already sent value: %" PRIu64 "> %" PRIu64,
546 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
547 return;
548 }
549 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Running oldest possible timeslice %" PRIu64 " propagation.",
550 (uint64_t)oldestPossibleOutput.timeslice.value);
551 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
552
553 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
554 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
555 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
556 // TODO: this we could cache in the proxy at the bind moment.
557 if (info.channelType != ChannelAccountingType::DPL) {
558 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
559 continue;
560 }
561 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value)) {
562 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
563 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
564 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
565 }
566 }
567 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
568};
569
570auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
571 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
572 auto& ref = task.user<DecongestionContext>().ref;
573
574 auto& decongestion = ref.get<DecongestionService>();
575 auto& state = ref.get<DeviceState>();
576 auto& timesliceIndex = ref.get<TimesliceIndex>();
577 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
578 int64_t oldNextTimeslice = decongestion.nextTimeslice;
579 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
580 if (oldNextTimeslice != decongestion.nextTimeslice) {
581 if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
582 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
583 } else {
584 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
585 }
586 timesliceIndex.rescan();
587 }
588};
589
590// Decongestion service
591// If we do not have any Timeframe input, it means we must be creating timeslices
592// in order and that we should propagate the oldest possible timeslice at the end
593// of each processing step.
596{
597 return ServiceSpec{
598 .name = "decongestion",
599 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
600 auto* decongestion = new DecongestionService();
601 for (auto& input : services.get<DeviceSpec const>().inputs) {
602 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
603 LOGP(detail, "Found a real data input, we cannot update the oldest possible timeslice when sending messages");
604 decongestion->isFirstInTopology = false;
605 break;
606 }
607 }
608 for (const auto& label : services.get<DeviceSpec const>().labels) {
610 decongestion->suppressDomainInfo = true;
611 break;
612 }
613 }
614 auto& queue = services.get<AsyncQueue>();
615 decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
616 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
617 },
618 .postForwarding = [](ProcessingContext& ctx, void* service) {
619 auto* decongestion = reinterpret_cast<DecongestionService*>(service);
620 if (O2_BUILTIN_LIKELY(decongestion->isFirstInTopology == false)) {
621 return;
622 }
623 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, service);
624 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "postForwardingCallbacks", "We are the first one in the topology, we need to update the oldest possible timeslice");
625 auto& timesliceIndex = ctx.services().get<TimesliceIndex>();
626 auto& relayer = ctx.services().get<DataRelayer>();
627 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
628 auto& proxy = ctx.services().get<FairMQDeviceProxy>();
629 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
630 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
631 LOGP(detail, "Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
632 return;
633 }
634
635 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
636 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
637 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
638 (uint64_t)oldestPossibleOutput.timeslice.value);
639 return;
640 }
641 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
642 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
643 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
644 return;
645 }
646
647 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Broadcasting oldest posssible output %" PRIu64 " due to %{public}s (%" PRIu64 ")",
648 (uint64_t)oldestPossibleOutput.timeslice.value,
649 oldestPossibleOutput.slot.index == -1 ? "channel" : "slot",
650 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
651 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Ordered active %d", decongestion->orderedCompletionPolicyActive);
652 if (decongestion->orderedCompletionPolicyActive) {
653 auto oldNextTimeslice = decongestion->nextTimeslice;
654 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
655 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Next timeslice %" PRIi64, decongestion->nextTimeslice);
656 if (oldNextTimeslice != decongestion->nextTimeslice) {
657 auto& state = ctx.services().get<DeviceState>();
659 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
660 } else {
661 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
662 }
663 timesliceIndex.rescan();
664 }
665 }
666 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ctx.services(), oldestPossibleOutput.timeslice.value);
667
668 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
669 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
670 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
671 // TODO: this we could cache in the proxy at the bind moment.
672 if (info.channelType != ChannelAccountingType::DPL) {
673 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
674 continue;
675 }
676 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ctx.services(), info, state, oldestPossibleOutput.timeslice.value)) {
677 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
678 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
679 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
680 }
681 }
682 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
683 .stop = [](ServiceRegistryRef services, void* service) {
684 auto* decongestion = (DecongestionService*)service;
685 services.get<TimesliceIndex>().reset();
686 decongestion->nextEnumerationTimeslice = 0;
687 decongestion->nextEnumerationTimesliceRewinded = false;
688 decongestion->lastTimeslice = 0;
689 decongestion->nextTimeslice = 0;
690 decongestion->oldestPossibleTimesliceTask = {0};
691 auto &state = services.get<DeviceState>();
692 for (auto &channel : state.inputChannelInfos) {
693 channel.oldestForChannel = {0};
694 } },
695 .domainInfoUpdated = [](ServiceRegistryRef services, size_t oldestPossibleTimeslice, ChannelIndex channel) {
696 auto& decongestion = services.get<DecongestionService>();
697 auto& relayer = services.get<DataRelayer>();
698 auto& timesliceIndex = services.get<TimesliceIndex>();
699 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, &decongestion);
700 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Received oldest possible timeframe %" PRIu64 " from channel %d",
701 (uint64_t)oldestPossibleTimeslice, channel.value);
702 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
703 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
704 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
705
706 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
707 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
708 return;
709 }
710 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
711 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
712 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
713 return;
714 }
715 auto& queue = services.get<AsyncQueue>();
716 const auto& state = services.get<DeviceState>();
719 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
720 (uint64_t)oldestPossibleOutput.timeslice.value);
722 queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
723 .id = decongestion.oldestPossibleTimesliceTask,
724 .debounce = -1, .callback = decongestionCallback}
725 .user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
726
727 if (decongestion.orderedCompletionPolicyActive) {
729 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
730 .callback = decongestionCallbackOrdered}
731 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
732 } },
733 .kind = ServiceKind::Serial};
734}
735
736// FIXME: allow configuring the default number of threads per device
737// This should probably be done by overriding the preFork
738// callback and using the boost program options there to
739// get the default number of threads.
741{
742 return ServiceSpec{
743 .name = "threadpool",
744 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
745 auto* pool = new ThreadPool();
746 // FIXME: this will require some extra argument for the configuration context of a service
747 pool->poolSize = 1;
748 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
749 },
750 .configure = [](InitContext&, void* service) -> void* {
751 auto* t = reinterpret_cast<ThreadPool*>(service);
752 // FIXME: this will require some extra argument for the configuration context of a service
753 t->poolSize = 1;
754 return service;
755 },
756 .postForkParent = [](ServiceRegistryRef services) -> void {
757 // FIXME: this will require some extra argument for the configuration context of a service
758 auto numWorkersS = std::to_string(1);
759 setenv("UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
760 },
761 .kind = ServiceKind::Serial};
762}
763
764namespace
765{
766auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
767{
768 // Update the timer to make sure we have the correct time when sending out the stats.
769 uv_update_time(registry.get<DeviceState>().loop);
770 // Derive the amount of shared memory used
771 auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
772 using namespace fair::mq::shmem;
773 auto& spec = registry.get<DeviceSpec const>();
774
775 // FIXME: Ugly, but we do it only every 5 seconds...
776 if (stats.hasAvailSHMMetric) {
777 auto device = registry.get<RawDeviceService>().device();
778 long freeMemory = -1;
779 try {
780 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
781 } catch (...) {
782 }
783 if (freeMemory == -1) {
784 try {
785 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
786 } catch (...) {
787 }
788 }
789 stats.updateStats({static_cast<unsigned short>(static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory});
790 }
791
792 auto device = registry.get<RawDeviceService>().device();
793
794 int64_t totalBytesIn = 0;
795 int64_t totalBytesOut = 0;
796
797 for (auto& channel : device->GetChannels()) {
798 totalBytesIn += channel.second[0].GetBytesRx();
799 totalBytesOut += channel.second[0].GetBytesTx();
800 }
801
802 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_IN), DataProcessingStats::Op::Set, totalBytesIn / 1000000});
803 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_OUT), DataProcessingStats::Op::Set, totalBytesOut / 1000000});
804
805 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_IN_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesIn / 1000000});
806 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
807};
808
809auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
810{
811 if (!registry.get<DriverConfig const>().driverHasGUI) {
812 return;
813 }
814 states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
815 auto& client = registry.get<ControlService>();
816 client.push(spec, value, timestamp);
817 });
818}
819
820O2_DECLARE_DYNAMIC_LOG(monitoring_service);
821
823auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
824{
825 // Flushing metrics should only happen on main thread to avoid
826 // having to have a mutex for the communication with the driver.
827 O2_SIGNPOST_ID_GENERATE(sid, monitoring_service);
828 O2_SIGNPOST_START(monitoring_service, sid, "flush", "flushing metrics");
829 if (registry.isMainThread() == false) {
830 LOGP(fatal, "Flushing metrics should only happen on the main thread.");
831 }
832 auto& monitoring = registry.get<Monitoring>();
833 auto& relayer = registry.get<DataRelayer>();
834
835 // Send all the relevant metrics for the relayer to update the GUI
836 stats.flushChangedMetrics([&monitoring, sid](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) mutable -> void {
837 // convert timestamp to a time_point
838 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
839 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
840 if (spec.kind == DataProcessingStats::Kind::UInt64) {
841 if (value < 0) {
842 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is negative, setting to 0",
843 spec.name.c_str());
844 value = 0;
845 }
846 metric.addValue((uint64_t)value, "value");
847 } else {
848 if (value > (int64_t)std::numeric_limits<int>::max()) {
849 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is too large, setting to INT_MAX",
850 spec.name.c_str());
851 value = (int64_t)std::numeric_limits<int>::max();
852 }
853 if (value < (int64_t)std::numeric_limits<int>::min()) {
854 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is too small, setting to INT_MIN",
855 spec.name.c_str());
856 value = (int64_t)std::numeric_limits<int>::min();
857 }
858 metric.addValue((int)value, "value");
859 }
860 if (spec.scope == DataProcessingStats::Scope::DPL) {
861 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
862 }
863 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Flushing metric %{public}s", spec.name.c_str());
864 monitoring.send(std::move(metric));
865 });
866 relayer.sendContextState();
867 monitoring.flushBuffer();
868 O2_SIGNPOST_END(monitoring_service, sid, "flush", "done flushing metrics");
869};
870} // namespace
871
873{
874 return ServiceSpec{
875 .name = "data-processing-stats",
876 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
877 timespec now;
878 clock_gettime(CLOCK_REALTIME, &now);
879 uv_update_time(state.loop);
880 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
882 .minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
885 config);
886 auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
887
888 // It makes no sense to update the stats more often than every 5s
889 int quickUpdateInterval = 5000;
890 uint64_t quickRefreshInterval = 7000;
891 uint64_t onlineRefreshLatency = 60000; // For metrics which are reported online, we flush them every 60s regardless of their state.
892 using MetricSpec = DataProcessingStats::MetricSpec;
893 using Kind = DataProcessingStats::Kind;
894 using Scope = DataProcessingStats::Scope;
895
896#ifdef NDEBUG
897 bool enableDebugMetrics = false;
898#else
899 bool enableDebugMetrics = true;
900#endif
901 bool arrowAndResourceLimitingMetrics = false;
903 arrowAndResourceLimitingMetrics = true;
904 }
905
906 int64_t consumedTimeframesPublishInterval = 0;
908 consumedTimeframesPublishInterval = 5000;
909 }
910 // Input proxies should not report cpu_usage_fraction,
911 // because of the rate limiting which biases the measurement.
912 auto& spec = services.get<DeviceSpec const>();
913 bool enableCPUUsageFraction = true;
914 auto isProxy = [](DataProcessorLabel const& label) -> bool { return label == DataProcessorLabel{"input-proxy"}; };
915 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
916 O2_SIGNPOST_ID_GENERATE(mid, policies);
917 O2_SIGNPOST_EVENT_EMIT(policies, mid, "metrics", "Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
918 enableCPUUsageFraction = false;
919 }
920
921 std::vector<DataProcessingStats::MetricSpec> metrics = {
922 MetricSpec{.name = "errors",
924 .kind = Kind::UInt64,
925 .scope = Scope::Online,
926 .minPublishInterval = quickUpdateInterval,
927 .maxRefreshLatency = quickRefreshInterval},
928 MetricSpec{.name = "exceptions",
930 .kind = Kind::UInt64,
931 .scope = Scope::Online,
932 .minPublishInterval = quickUpdateInterval},
933 MetricSpec{.name = "inputs/relayed/pending",
935 .kind = Kind::UInt64,
936 .minPublishInterval = quickUpdateInterval},
937 MetricSpec{.name = "inputs/relayed/incomplete",
939 .kind = Kind::UInt64,
940 .minPublishInterval = quickUpdateInterval},
941 MetricSpec{.name = "inputs/relayed/total",
943 .kind = Kind::UInt64,
944 .minPublishInterval = quickUpdateInterval},
945 MetricSpec{.name = "elapsed_time_ms",
947 .kind = Kind::UInt64,
948 .minPublishInterval = quickUpdateInterval},
949 MetricSpec{.name = "total_wall_time_ms",
951 .kind = Kind::UInt64,
952 .minPublishInterval = quickUpdateInterval},
953 MetricSpec{.name = "last_processed_input_size_byte",
955 .kind = Kind::UInt64,
956 .minPublishInterval = quickUpdateInterval},
957 MetricSpec{.name = "total_processed_input_size_byte",
959 .kind = Kind::UInt64,
960 .scope = Scope::Online,
961 .minPublishInterval = quickUpdateInterval},
962 MetricSpec{.name = "total_sigusr1",
964 .kind = Kind::UInt64,
965 .minPublishInterval = quickUpdateInterval},
966 MetricSpec{.name = "consumed-timeframes",
968 .kind = Kind::UInt64,
969 .minPublishInterval = consumedTimeframesPublishInterval,
970 .maxRefreshLatency = quickRefreshInterval,
971 .sendInitialValue = true},
972 MetricSpec{.name = "min_input_latency_ms",
974 .kind = Kind::UInt64,
975 .scope = Scope::Online,
976 .minPublishInterval = quickUpdateInterval},
977 MetricSpec{.name = "max_input_latency_ms",
979 .kind = Kind::UInt64,
980 .minPublishInterval = quickUpdateInterval},
981 MetricSpec{.name = "total_rate_in_mb_s",
983 .kind = Kind::Rate,
984 .scope = Scope::Online,
985 .minPublishInterval = quickUpdateInterval,
986 .maxRefreshLatency = onlineRefreshLatency,
987 .sendInitialValue = true},
988 MetricSpec{.name = "total_rate_out_mb_s",
990 .kind = Kind::Rate,
991 .scope = Scope::Online,
992 .minPublishInterval = quickUpdateInterval,
993 .maxRefreshLatency = onlineRefreshLatency,
994 .sendInitialValue = true},
995 MetricSpec{.name = "processing_rate_hz",
997 .kind = Kind::Rate,
998 .scope = Scope::Online,
999 .minPublishInterval = quickUpdateInterval,
1000 .maxRefreshLatency = onlineRefreshLatency,
1001 .sendInitialValue = true},
1002 MetricSpec{.name = "cpu_usage_fraction",
1003 .enabled = enableCPUUsageFraction,
1005 .kind = Kind::Rate,
1006 .scope = Scope::Online,
1007 .minPublishInterval = quickUpdateInterval,
1008 .maxRefreshLatency = onlineRefreshLatency,
1009 .sendInitialValue = true},
1010 MetricSpec{.name = "performed_computations",
1012 .kind = Kind::UInt64,
1013 .scope = Scope::Online,
1014 .minPublishInterval = quickUpdateInterval,
1015 .maxRefreshLatency = onlineRefreshLatency,
1016 .sendInitialValue = true},
1017 MetricSpec{.name = "total_bytes_in",
1019 .kind = Kind::UInt64,
1020 .scope = Scope::Online,
1021 .minPublishInterval = quickUpdateInterval,
1022 .maxRefreshLatency = onlineRefreshLatency,
1023 .sendInitialValue = true},
1024 MetricSpec{.name = "total_bytes_out",
1026 .kind = Kind::UInt64,
1027 .scope = Scope::Online,
1028 .minPublishInterval = quickUpdateInterval,
1029 .maxRefreshLatency = onlineRefreshLatency,
1030 .sendInitialValue = true},
1031 MetricSpec{.name = fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId),
1032 .metricId = (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512),
1033 .kind = Kind::UInt64,
1034 .scope = Scope::Online,
1035 .minPublishInterval = 500,
1036 .maxRefreshLatency = onlineRefreshLatency,
1037 .sendInitialValue = true},
1038 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1039 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1040 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1041 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1042 MetricSpec{.name = "arrow-bytes-destroyed",
1043 .enabled = arrowAndResourceLimitingMetrics,
1044 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED),
1045 .kind = Kind::UInt64,
1046 .scope = Scope::DPL,
1047 .minPublishInterval = 0,
1048 .maxRefreshLatency = 10000,
1049 .sendInitialValue = true},
1050 MetricSpec{.name = "arrow-messages-destroyed",
1051 .enabled = arrowAndResourceLimitingMetrics,
1052 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED),
1053 .kind = Kind::UInt64,
1054 .scope = Scope::DPL,
1055 .minPublishInterval = 0,
1056 .maxRefreshLatency = 10000,
1057 .sendInitialValue = true},
1058 MetricSpec{.name = "arrow-bytes-created",
1059 .enabled = arrowAndResourceLimitingMetrics,
1060 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED),
1061 .kind = Kind::UInt64,
1062 .scope = Scope::DPL,
1063 .minPublishInterval = 0,
1064 .maxRefreshLatency = 10000,
1065 .sendInitialValue = true},
1066 MetricSpec{.name = "arrow-messages-created",
1067 .enabled = arrowAndResourceLimitingMetrics,
1068 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED),
1069 .kind = Kind::UInt64,
1070 .scope = Scope::DPL,
1071 .minPublishInterval = 0,
1072 .maxRefreshLatency = 10000,
1073 .sendInitialValue = true},
1074 MetricSpec{.name = "arrow-bytes-expired",
1075 .enabled = arrowAndResourceLimitingMetrics,
1076 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED),
1077 .kind = Kind::UInt64,
1078 .scope = Scope::DPL,
1079 .minPublishInterval = 0,
1080 .maxRefreshLatency = 10000,
1081 .sendInitialValue = true},
1082 MetricSpec{.name = "shm-offer-bytes-consumed",
1083 .enabled = arrowAndResourceLimitingMetrics,
1084 .metricId = static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED),
1085 .kind = Kind::UInt64,
1086 .scope = Scope::DPL,
1087 .minPublishInterval = 0,
1088 .maxRefreshLatency = 10000,
1089 .sendInitialValue = true},
1090 MetricSpec{.name = "timeslice-offer-number-consumed",
1091 .enabled = arrowAndResourceLimitingMetrics,
1092 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED),
1093 .kind = Kind::UInt64,
1094 .scope = Scope::DPL,
1095 .minPublishInterval = 0,
1096 .maxRefreshLatency = 10000,
1097 .sendInitialValue = true},
1098 MetricSpec{.name = "timeslices-expired",
1099 .enabled = arrowAndResourceLimitingMetrics,
1100 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED),
1101 .kind = Kind::UInt64,
1102 .scope = Scope::DPL,
1103 .minPublishInterval = 0,
1104 .maxRefreshLatency = 10000,
1105 .sendInitialValue = true},
1106 MetricSpec{.name = "timeslices-started",
1107 .enabled = arrowAndResourceLimitingMetrics,
1108 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_STARTED),
1109 .kind = Kind::UInt64,
1110 .scope = Scope::DPL,
1111 .minPublishInterval = 0,
1112 .maxRefreshLatency = 10000,
1113 .sendInitialValue = true},
1114 MetricSpec{.name = "timeslices-done",
1115 .enabled = arrowAndResourceLimitingMetrics,
1116 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_DONE),
1117 .kind = Kind::UInt64,
1118 .scope = Scope::DPL,
1119 .minPublishInterval = 0,
1120 .maxRefreshLatency = 10000,
1121 .sendInitialValue = true},
1122 MetricSpec{.name = "resources-missing",
1123 .enabled = enableDebugMetrics,
1124 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
1125 .kind = Kind::UInt64,
1126 .scope = Scope::DPL,
1127 .minPublishInterval = 1000,
1128 .maxRefreshLatency = 1000,
1129 .sendInitialValue = true},
1130 MetricSpec{.name = "resources-insufficient",
1131 .enabled = enableDebugMetrics,
1132 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
1133 .kind = Kind::UInt64,
1134 .scope = Scope::DPL,
1135 .minPublishInterval = 1000,
1136 .maxRefreshLatency = 1000,
1137 .sendInitialValue = true},
1138 MetricSpec{.name = "resources-satisfactory",
1139 .enabled = enableDebugMetrics,
1140 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
1141 .kind = Kind::UInt64,
1142 .scope = Scope::DPL,
1143 .minPublishInterval = 1000,
1144 .maxRefreshLatency = 1000,
1145 .sendInitialValue = true},
1146 MetricSpec{.name = "resource-offer-expired",
1147 .enabled = arrowAndResourceLimitingMetrics,
1148 .metricId = static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED),
1149 .kind = Kind::UInt64,
1150 .scope = Scope::DPL,
1151 .minPublishInterval = 0,
1152 .maxRefreshLatency = 10000,
1153 .sendInitialValue = true}};
1154
1155 for (auto& metric : metrics) {
1156 if (metric.metricId == (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512)) {
1157 if (spec.name.compare("readout-proxy") == 0) {
1158 stats->hasAvailSHMMetric = true;
1159 } else {
1160 continue;
1161 }
1162 }
1163 stats->registerMetric(metric);
1164 }
1165
1166 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1167 },
1168 .configure = noConfiguration(),
1169 .postProcessing = [](ProcessingContext& context, void* service) {
1170 auto* stats = (DataProcessingStats*)service;
1172 .preDangling = [](DanglingContext& context, void* service) {
1173 auto* stats = (DataProcessingStats*)service;
1174 sendRelayerMetrics(context.services(), *stats);
1175 flushMetrics(context.services(), *stats); },
1176 .postDangling = [](DanglingContext& context, void* service) {
1177 auto* stats = (DataProcessingStats*)service;
1178 sendRelayerMetrics(context.services(), *stats);
1179 flushMetrics(context.services(), *stats); },
1180 .preEOS = [](EndOfStreamContext& context, void* service) {
1181 auto* stats = (DataProcessingStats*)service;
1182 sendRelayerMetrics(context.services(), *stats);
1183 flushMetrics(context.services(), *stats); },
1184 .preLoop = [](ServiceRegistryRef ref, void* service) {
1185 auto* stats = (DataProcessingStats*)service;
1186 flushMetrics(ref, *stats); },
1187 .kind = ServiceKind::Serial};
1188}
1189
1190// This is similar to the dataProcessingStats, but it designed to synchronize
1191// history-less metrics which are e.g. used for the GUI.
1193{
1194 return ServiceSpec{
1195 .name = "data-processing-states",
1196 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
1197 timespec now;
1198 clock_gettime(CLOCK_REALTIME, &now);
1199 uv_update_time(state.loop);
1200 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
1203 states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
1204 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
1205 },
1206 .configure = noConfiguration(),
1207 .postProcessing = [](ProcessingContext& context, void* service) {
1208 auto* states = (DataProcessingStates*)service;
1209 states->processCommandQueue(); },
1210 .preDangling = [](DanglingContext& context, void* service) {
1211 auto* states = (DataProcessingStates*)service;
1212 flushStates(context.services(), *states); },
1213 .postDangling = [](DanglingContext& context, void* service) {
1214 auto* states = (DataProcessingStates*)service;
1215 flushStates(context.services(), *states); },
1216 .preEOS = [](EndOfStreamContext& context, void* service) {
1217 auto* states = (DataProcessingStates*)service;
1218 flushStates(context.services(), *states); },
1219 .kind = ServiceKind::Global};
1220}
1221
1223};
1224
1226{
1227 return ServiceSpec{
1228 .name = "gui-metrics",
1229 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
1230 auto* stats = new GUIMetrics();
1231 auto& monitoring = services.get<Monitoring>();
1232 auto& spec = services.get<DeviceSpec const>();
1233 monitoring.send({(int)spec.inputChannels.size(), fmt::format("oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1234 monitoring.send({(int)1, fmt::format("oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1235 monitoring.send({(int)spec.outputChannels.size(), fmt::format("oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1236 monitoring.send({(int)1, fmt::format("oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1237 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1238 },
1239 .configure = noConfiguration(),
1240 .postProcessing = [](ProcessingContext& context, void* service) {
1241 auto& relayer = context.services().get<DataRelayer>();
1242 auto& monitoring = context.services().get<Monitoring>();
1243 auto& spec = context.services().get<DeviceSpec const>();
1244 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1245 for (size_t ci; ci < spec.outputChannels.size(); ++ci) {
1246 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format("oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1247 } },
1248 .domainInfoUpdated = [](ServiceRegistryRef registry, size_t timeslice, ChannelIndex channel) {
1249 auto& monitoring = registry.get<Monitoring>();
1250 monitoring.send({(uint64_t)timeslice, fmt::format("oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1251 .active = false,
1252 .kind = ServiceKind::Serial};
1253}
1254
1256{
1257 return ServiceSpec{
1258 .name = "object-cache",
1259 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1260 auto* cache = new ObjectCache();
1261 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1262 },
1263 .configure = noConfiguration(),
1265}
1266
1268{
1269 return ServiceSpec{
1270 .name = "data-processing-context",
1271 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1272 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessorContext>(), new DataProcessorContext()};
1273 },
1274 .configure = noConfiguration(),
1275 .exit = [](ServiceRegistryRef, void* service) { auto* context = (DataProcessorContext*)service; delete context; },
1276 .kind = ServiceKind::Serial};
1277}
1278
1280{
1281 return ServiceSpec{
1282 .name = "data-allocator",
1283 .uniqueId = simpleServiceId<DataAllocator>(),
1284 .init = [](ServiceRegistryRef ref, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1285 return ServiceHandle{
1286 .hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1287 .instance = new DataAllocator(ref),
1288 .kind = ServiceKind::Stream,
1289 .name = "data-allocator",
1290 };
1291 },
1292 .configure = noConfiguration(),
1293 .kind = ServiceKind::Stream};
1294}
1295
1297std::vector<ServiceSpec> CommonServices::defaultServices(std::string extraPlugins, int numThreads)
1298{
1299 std::vector<ServiceSpec> specs{
1303 asyncQueue(),
1310 controlSpec(),
1311 rootFileSpec(),
1312 parallelSpec(),
1313 callbacksSpec(),
1316 dataRelayer(),
1318 dataSender(),
1319 objectCache(),
1320 ccdbSupportSpec()};
1321
1323 specs.push_back(ArrowSupport::arrowBackendSpec());
1324 }
1327 specs.push_back(decongestionSpec());
1328
1329 std::string loadableServicesStr = extraPlugins;
1330 // Do not load InfoLogger by default if we are not at P2.
1332 if (loadableServicesStr.empty() == false) {
1333 loadableServicesStr += ",";
1334 }
1335 loadableServicesStr += "O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1336 }
1337 // Load plugins depending on the environment
1338 std::vector<LoadablePlugin> loadablePlugins = {};
1339 char* loadableServicesEnv = getenv("DPL_LOAD_SERVICES");
1340 // String to define the services to load is:
1341 //
1342 // library1:name1,library2:name2,...
1343 if (loadableServicesEnv) {
1344 if (loadableServicesStr.empty() == false) {
1345 loadableServicesStr += ",";
1346 }
1347 loadableServicesStr += loadableServicesEnv;
1348 }
1349 loadablePlugins = PluginManager::parsePluginSpecString(loadableServicesStr.c_str());
1350 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
1351 // I should make it optional depending wether the GUI is there or not...
1352 specs.push_back(CommonServices::guiMetricsSpec());
1353 if (numThreads) {
1354 specs.push_back(threadPool(numThreads));
1355 }
1356 return specs;
1357}
1358
1359std::vector<ServiceSpec> CommonServices::arrowServices()
1360{
1361 return {
1364 };
1365}
1366
1367} // namespace o2::framework
std::vector< std::string > labels
benchmark::State & state
std::vector< OutputRoute > routes
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
std::ostringstream debug
int16_t time
Definition RawEventData.h:4
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
Definition Signpost.h:573
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
Definition DataSender.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint id
Definition glcorearb.h:650
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
const DataProcessorLabel suppressDomainInfoLabel
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
@ NoTransition
No pending transitions.
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
Definition AsyncQueue.h:32
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec controlSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
Running state information of a given device.
Definition DeviceState.h:34
static size_t maxLanes(std::vector< InputRoute > const &routes)
header::DataOrigin origin
Definition Output.h:28
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"