Project
Loading...
Searching...
No Matches
CommonServices.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
25#include "Framework/Signpost.h"
32#include "InputRouteHelpers.h"
36#include "Framework/Tracing.h"
46#include "Framework/Signpost.h"
48
49#include "TextDriverClient.h"
50#include "WSDriverClient.h"
51#include "HTTPParser.h"
52#include "../src/DataProcessingStatus.h"
53#include "DecongestionService.h"
54#include "ArrowSupport.h"
56#include "Headers/STFHeader.h"
57#include "Headers/DataHeader.h"
58
59#include <Configuration/ConfigurationInterface.h>
60#include <Configuration/ConfigurationFactory.h>
61#include <Monitoring/MonitoringFactory.h>
62#include "Framework/Signpost.h"
63
64#include <fairmq/Device.h>
65#include <fairmq/shmem/Monitor.h>
66#include <fairmq/shmem/Common.h>
67#include <fairmq/ProgOptions.h>
68#include <uv.h>
69
70#include <cstdlib>
71#include <cstring>
72
73using o2::configuration::ConfigurationFactory;
74using o2::configuration::ConfigurationInterface;
75using o2::monitoring::Monitoring;
76using o2::monitoring::MonitoringFactory;
77using Metric = o2::monitoring::Metric;
78using Key = o2::monitoring::tags::Key;
79using Value = o2::monitoring::tags::Value;
80
81O2_DECLARE_DYNAMIC_LOG(data_processor_context);
82O2_DECLARE_DYNAMIC_LOG(stream_context);
85
86namespace o2::framework
87{
88
89#define MONITORING_QUEUE_SIZE 100
91{
92 return ServiceSpec{
93 .name = "monitoring",
94 .init = [](ServiceRegistryRef registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
95 void* service = nullptr;
96 bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
97 bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
98 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
99 o2::monitoring::Monitoring* monitoring;
100 if (useDPL) {
101 monitoring = new Monitoring();
102 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
103 (dynamic_cast<o2::monitoring::Backend*>(dplBackend.get()))->setVerbosity(o2::monitoring::Verbosity::Debug);
104 monitoring->addBackend(std::move(dplBackend));
105 } else {
106 auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
107 monitoring = MonitoringFactory::Get(backend).release();
108 }
109 service = monitoring;
110 monitoring->enableBuffering(MONITORING_QUEUE_SIZE);
111 assert(registry.get<DeviceSpec const>().name.empty() == false);
112 monitoring->addGlobalTag("pipeline_id", std::to_string(registry.get<DeviceSpec const>().inputTimesliceId));
113 monitoring->addGlobalTag("dataprocessor_name", registry.get<DeviceSpec const>().name);
114 monitoring->addGlobalTag("dpl_instance", options.GetPropertyAsString("shm-segment-id"));
115 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
116 },
117 .configure = noConfiguration(),
118 .start = [](ServiceRegistryRef services, void* service) {
119 auto* monitoring = (o2::monitoring::Monitoring*)service;
120
121 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
122 if (extRunNumber == "unspecified") {
123 return;
124 }
125 try {
126 monitoring->setRunNumber(std::stoul(extRunNumber));
127 } catch (...) {
128 } },
129 .exit = [](ServiceRegistryRef registry, void* service) {
130 auto* monitoring = reinterpret_cast<Monitoring*>(service);
131 monitoring->flushBuffer();
132 delete monitoring; },
133 .kind = ServiceKind::Serial};
134}
135
136// An asyncronous service that executes actions in at the end of the data processing
138{
139 return ServiceSpec{
140 .name = "async-queue",
141 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
142 .configure = noConfiguration(),
143 .stop = [](ServiceRegistryRef services, void* service) {
144 auto& queue = services.get<AsyncQueue>();
146 },
147 .kind = ServiceKind::Serial};
148}
149
150// Make it a service so that it can be used easily from the analysis
151// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
152// basis when we get to it.
154{
155 return ServiceSpec{
156 .name = "timing-info",
157 .uniqueId = simpleServiceId<TimingInfo>(),
158 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
159 .configure = noConfiguration(),
160 .kind = ServiceKind::Stream};
161}
162
164{
165 return ServiceSpec{
166 .name = "stream-context",
167 .uniqueId = simpleServiceId<StreamContext>(),
168 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
169 .configure = noConfiguration(),
170 .preProcessing = [](ProcessingContext& context, void* service) {
171 auto* stream = (StreamContext*)service;
172 auto& routes = context.services().get<DeviceSpec const>().outputs;
173 // Notice I need to do this here, because different invocation for
174 // the same stream might be referring to different data processors.
175 // We should probably have a context which is per stream of a specific
176 // data processor.
177 stream->routeDPLCreated.resize(routes.size());
178 stream->routeCreated.resize(routes.size());
179 // Reset the routeDPLCreated at every processing step
180 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false);
181 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
182 .postProcessing = [](ProcessingContext& processingContext, void* service) {
183 auto* stream = (StreamContext*)service;
184 auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
185 auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
186 auto& messageContext = processingContext.services().get<MessageContext>();
187 // Check if we never created any data for this timeslice
188 // if we did not, but we still have didDispatched set to true
189 // it means it was created out of band.
190 bool userDidCreate = false;
191 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
192 for (size_t ri = 0; ri < routes.size(); ++ri) {
193 if (stream->routeCreated[ri] == true && stream->routeDPLCreated[ri] == false) {
194 userDidCreate = true;
195 break;
196 }
197 }
198 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "userDidCreate == %d && didDispatch == %d",
199 userDidCreate,
200 messageContext.didDispatch());
201 if (userDidCreate == false && messageContext.didDispatch() == true) {
202 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
203 userDidCreate,
204 messageContext.didDispatch());
205 return;
206 }
207 if (userDidCreate == false && messageContext.didDispatch() == false) {
208 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
209 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "No data created.");
210 return;
211 }
212 for (size_t ri = 0; ri < routes.size(); ++ri) {
213 auto &route = routes[ri];
214 auto &matcher = route.matcher;
215 if (stream->routeDPLCreated[ri] == true) {
216 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created by DPL. ri = %" PRIu64 ", %{public}s",
217 (uint64_t)ri, DataSpecUtils::describe(matcher).c_str());
218 continue;
219 }
220 if (stream->routeCreated[ri] == true) {
221 continue;
222 } if ((timeslice % route.maxTimeslices) != route.timeslice) {
223 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Route ri = %" PRIu64 ", skipped because of pipelining.",
224 (uint64_t)ri);
225 continue;
226 }
227 if (matcher.lifetime == Lifetime::Timeframe) {
228 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks",
229 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64 " and might result in dropped timeframes",
230 DataSpecUtils::describe(matcher).c_str(), (uint64_t)timeslice);
231 LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
232 }
233 } },
234 .preEOS = [](EndOfStreamContext& context, void* service) {
235 // We need to reset the routeDPLCreated / routeCreated because the end of stream
236 // uses a different context which does not know about the routes.
237 // FIXME: This should be fixed in a different way, but for now it will
238 // allow TPC IDC to work.
239 auto* stream = (StreamContext*)service;
240 auto& routes = context.services().get<DeviceSpec const>().outputs;
241 // Notice I need to do this here, because different invocation for
242 // the same stream might be referring to different data processors.
243 // We should probably have a context which is per stream of a specific
244 // data processor.
245 stream->routeDPLCreated.resize(routes.size());
246 stream->routeCreated.resize(routes.size());
247 // Reset the routeCreated / routeDPLCreated at every processing step
248 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false);
249 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false); },
250 .kind = ServiceKind::Stream};
251}
252
254{
255 return ServiceSpec{
256 .name = "datataking-contex",
257 .uniqueId = simpleServiceId<DataTakingContext>(),
258 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
259 .configure = noConfiguration(),
260 .preProcessing = [](ProcessingContext& processingContext, void* service) {
261 auto& context = processingContext.services().get<DataTakingContext>();
262 for (auto const& ref : processingContext.inputs()) {
263 const o2::framework::DataProcessingHeader *dph = o2::header::get<DataProcessingHeader*>(ref.header);
264 const auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
265 if (!dph || !dh) {
266 continue;
267 }
268 context.runNumber = fmt::format("{}", dh->runNumber);
269 break;
270 } },
271 // Notice this will be executed only once, because the service is declared upfront.
272 .start = [](ServiceRegistryRef services, void* service) {
273 auto& context = services.get<DataTakingContext>();
274
276
277 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
278 if (extRunNumber != "unspecified" || context.runNumber == "0") {
279 context.runNumber = extRunNumber;
280 }
281 auto extLHCPeriod = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhc_period", "unspecified");
282 if (extLHCPeriod != "unspecified") {
283 context.lhcPeriod = extLHCPeriod;
284 } else {
285 static const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
286 time_t now = time(nullptr);
287 auto ltm = gmtime(&now);
288 context.lhcPeriod = months[ltm->tm_mon];
289 LOG(info) << "LHCPeriod is not available, using current month " << context.lhcPeriod;
290 }
291
292 auto extRunType = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("run_type", "unspecified");
293 if (extRunType != "unspecified") {
294 context.runType = extRunType;
295 }
296 auto extEnvId = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", "unspecified");
297 if (extEnvId != "unspecified") {
298 context.envId = extEnvId;
299 }
300 auto extDetectors = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("detectors", "unspecified");
301 if (extDetectors != "unspecified") {
302 context.detectors = extDetectors;
303 }
304 auto forcedRaw = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("force_run_as_raw", "false");
305 context.forcedRaw = forcedRaw == "true"; },
306 .kind = ServiceKind::Stream};
307}
308
310};
311
313{
314 return ServiceSpec{
315 .name = "configuration",
316 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
317 auto backend = options.GetPropertyAsString("configuration");
318 if (backend == "command-line") {
319 return ServiceHandle{0, nullptr};
320 }
321 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
322 ConfigurationFactory::getConfiguration(backend).release()};
323 },
324 .configure = noConfiguration(),
325 .driverStartup = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
326 if (dc.options.count("configuration") == 0) {
327 registry.registerService(ServiceHandle{0, nullptr});
328 return;
329 }
330 auto backend = dc.options["configuration"].as<std::string>();
331 registry.registerService(ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
332 ConfigurationFactory::getConfiguration(backend).release()}); },
333 .kind = ServiceKind::Global};
334}
335
337{
338 return ServiceSpec{
339 .name = "driverClient",
340 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
341 auto backend = options.GetPropertyAsString("driver-client-backend");
342 if (backend == "stdout://") {
343 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
344 new TextDriverClient(services, state)};
345 }
346 auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
347 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
348 new WSDriverClient(services, ip.c_str(), port)};
349 },
350 .configure = noConfiguration(),
351 .kind = ServiceKind::Global};
352}
353
355{
356 return ServiceSpec{
357 .name = "control",
358 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
359 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
360 new ControlService(services, state)};
361 },
362 .configure = noConfiguration(),
363 .kind = ServiceKind::Serial};
364}
365
367{
368 return ServiceSpec{
369 .name = "localrootfile",
370 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
371 .configure = noConfiguration(),
372 .kind = ServiceKind::Serial};
373}
374
376{
377 return ServiceSpec{
378 .name = "parallel",
379 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
380 auto& spec = services.get<DeviceSpec const>();
381 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
382 new ParallelContext(spec.rank, spec.nSlots)};
383 },
384 .configure = noConfiguration(),
385 .kind = ServiceKind::Serial};
386}
387
389{
390 return ServiceSpec{
391 .name = "timesliceindex",
392 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
393 auto& spec = services.get<DeviceSpec const>();
394 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
395 new TimesliceIndex(InputRouteHelpers::maxLanes(spec.inputs), state.inputChannelInfos)};
396 },
397 .configure = noConfiguration(),
398 .kind = ServiceKind::Serial};
399}
400
402{
403 return ServiceSpec{
404 .name = "callbacks",
405 .init = simpleServiceInit<CallbackService, CallbackService>(),
406 .configure = noConfiguration(),
407 .kind = ServiceKind::Serial};
408}
409
411{
412 return ServiceSpec{
413 .name = "datarelayer",
414 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
415 auto& spec = services.get<DeviceSpec const>();
416 return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
417 new DataRelayer(spec.completionPolicy,
418 spec.inputs,
419 services.get<TimesliceIndex>(),
420 services)};
421 },
422 .configure = noConfiguration(),
423 .kind = ServiceKind::Serial};
424}
425
427{
428 return ServiceSpec{
429 .name = "datasender",
430 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
431 return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
432 new DataSender(services)};
433 },
434 .configure = noConfiguration(),
435 .preProcessing = [](ProcessingContext&, void* service) {
436 auto& dataSender = *reinterpret_cast<DataSender*>(service);
437 dataSender.reset(); },
438 .postDispatching = [](ProcessingContext& ctx, void* service) {
439 auto& dataSender = *reinterpret_cast<DataSender*>(service);
440 // If the quit was requested, the post dispatching can still happen
441 // but with an empty set of data.
442 if (ctx.services().get<DeviceState>().quitRequested == false) {
443 dataSender.verifyMissingSporadic();
444 } },
446}
447
451
453{
454 return ServiceSpec{
455 .name = "tracing",
456 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
457 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<TracingInfrastructure>(),
458 .instance = new TracingInfrastructure(),
459 .kind = ServiceKind::Serial};
460 },
461 .configure = noConfiguration(),
462 .preProcessing = [](ProcessingContext&, void* service) {
463 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
464 t->processingCount += 1; },
465 .postProcessing = [](ProcessingContext&, void* service) {
466 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
467 t->processingCount += 1; },
468 .kind = ServiceKind::Serial};
469}
470
472};
473
474// CCDB Support service
476{
477 return ServiceSpec{
478 .name = "ccdb-support",
479 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
480 // iterate on all the outputs matchers
481 auto& spec = services.get<DeviceSpec const>();
482 for (auto& output : spec.outputs) {
483 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
484 LOGP(debug, "Optional inputs support enabled");
485 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = new CCDBSupport, .kind = ServiceKind::Serial};
486 }
487 }
488 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = nullptr, .kind = ServiceKind::Serial};
489 },
490 .configure = noConfiguration(),
491 .finaliseOutputs = [](ProcessingContext& pc, void* service) {
492 if (!service) {
493 return;
494 }
495 if (pc.outputs().countDeviceOutputs(true) == 0) {
496 LOGP(debug, "We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
497 return;
498 }
499 auto& timingInfo = pc.services().get<TimingInfo>();
500
501 // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
502 // we create a new message.
503 InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
504 auto& streamContext = pc.services().get<StreamContext>();
505 for (size_t oi = 0; oi < pc.services().get<DeviceSpec const>().outputs.size(); ++oi) {
506 OutputRoute const& output = pc.services().get<DeviceSpec const>().outputs[oi];
507 if ((output.timeslice % output.maxTimeslices) != 0) {
508 continue;
509 }
510 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
511 auto concrete = DataSpecUtils::asConcreteDataMatcher(output.matcher);
512 if (concrete.subSpec == 0) {
513 continue;
514 }
515 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(Output{concrete.origin, concrete.description, concrete.subSpec});
516 stfDist.id = timingInfo.timeslice;
517 stfDist.firstOrbit = timingInfo.firstTForbit;
518 stfDist.runNumber = timingInfo.runNumber;
519 // We mark it as not created, because we do should not account for it when
520 // checking if we created all the data for a timeslice.
521 O2_SIGNPOST_ID_FROM_POINTER(sid, stream_context, &streamContext);
522 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "finaliseOutputs", "Route %" PRIu64 " (%{public}s) was created by DPL.", (uint64_t)oi,
523 DataSpecUtils::describe(output.matcher).c_str());
524 streamContext.routeDPLCreated[oi] = true;
525 }
526 } },
527 .kind = ServiceKind::Global};
528}
533
534auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
535 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
536 auto& ref = task.user<DecongestionContext>().ref;
537
538 auto& decongestion = ref.get<DecongestionService>();
539 auto& proxy = ref.get<FairMQDeviceProxy>();
540
541 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
542 cid.value = id;
543 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
544 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Not sending already sent value: %" PRIu64 "> %" PRIu64,
545 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
546 return;
547 }
548 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Running oldest possible timeslice %" PRIu64 " propagation.",
549 (uint64_t)oldestPossibleOutput.timeslice.value);
550 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
551
552 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
553 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
554 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
555 // TODO: this we could cache in the proxy at the bind moment.
556 if (info.channelType != ChannelAccountingType::DPL) {
557 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
558 continue;
559 }
560 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value)) {
561 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
562 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
563 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
564 }
565 }
566 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
567};
568
569auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
570 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
571 auto& ref = task.user<DecongestionContext>().ref;
572
573 auto& decongestion = ref.get<DecongestionService>();
574 auto& state = ref.get<DeviceState>();
575 auto& timesliceIndex = ref.get<TimesliceIndex>();
576 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
577 int64_t oldNextTimeslice = decongestion.nextTimeslice;
578 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
579 if (oldNextTimeslice != decongestion.nextTimeslice) {
580 if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
581 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
582 } else {
583 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
584 }
585 timesliceIndex.rescan();
586 }
587};
588
589// Decongestion service
590// If we do not have any Timeframe input, it means we must be creating timeslices
591// in order and that we should propagate the oldest possible timeslice at the end
592// of each processing step.
595{
596 return ServiceSpec{
597 .name = "decongestion",
598 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
599 auto* decongestion = new DecongestionService();
600 for (auto& input : services.get<DeviceSpec const>().inputs) {
601 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
602 LOGP(detail, "Found a real data input, we cannot update the oldest possible timeslice when sending messages");
603 decongestion->isFirstInTopology = false;
604 break;
605 }
606 }
607 auto& queue = services.get<AsyncQueue>();
608 decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
609 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
610 },
611 .postForwarding = [](ProcessingContext& ctx, void* service) {
612 auto* decongestion = reinterpret_cast<DecongestionService*>(service);
613 if (O2_BUILTIN_LIKELY(decongestion->isFirstInTopology == false)) {
614 return;
615 }
616 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, service);
617 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "postForwardingCallbacks", "We are the first one in the topology, we need to update the oldest possible timeslice");
618 auto& timesliceIndex = ctx.services().get<TimesliceIndex>();
619 auto& relayer = ctx.services().get<DataRelayer>();
620 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
621 auto& proxy = ctx.services().get<FairMQDeviceProxy>();
622 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
623 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
624 LOGP(detail, "Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
625 return;
626 }
627
628 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
629 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
630 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
631 (uint64_t)oldestPossibleOutput.timeslice.value);
632 return;
633 }
634 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
635 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
636 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
637 return;
638 }
639
640 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Broadcasting oldest posssible output %" PRIu64 " due to %{public}s (%" PRIu64 ")",
641 (uint64_t)oldestPossibleOutput.timeslice.value,
642 oldestPossibleOutput.slot.index == -1 ? "channel" : "slot",
643 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
644 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Ordered active %d", decongestion->orderedCompletionPolicyActive);
645 if (decongestion->orderedCompletionPolicyActive) {
646 auto oldNextTimeslice = decongestion->nextTimeslice;
647 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
648 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Next timeslice %" PRIi64, decongestion->nextTimeslice);
649 if (oldNextTimeslice != decongestion->nextTimeslice) {
650 auto& state = ctx.services().get<DeviceState>();
652 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
653 } else {
654 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
655 }
656 timesliceIndex.rescan();
657 }
658 }
659 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ctx.services(), oldestPossibleOutput.timeslice.value);
660
661 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
662 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
663 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
664 // TODO: this we could cache in the proxy at the bind moment.
665 if (info.channelType != ChannelAccountingType::DPL) {
666 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
667 continue;
668 }
669 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ctx.services(), info, state, oldestPossibleOutput.timeslice.value)) {
670 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
671 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
672 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
673 }
674 }
675 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
676 .stop = [](ServiceRegistryRef services, void* service) {
677 auto* decongestion = (DecongestionService*)service;
678 services.get<TimesliceIndex>().reset();
679 decongestion->nextEnumerationTimeslice = 0;
680 decongestion->nextEnumerationTimesliceRewinded = false;
681 decongestion->lastTimeslice = 0;
682 decongestion->nextTimeslice = 0;
683 decongestion->oldestPossibleTimesliceTask = {0};
684 auto &state = services.get<DeviceState>();
685 for (auto &channel : state.inputChannelInfos) {
686 channel.oldestForChannel = {0};
687 } },
688 .domainInfoUpdated = [](ServiceRegistryRef services, size_t oldestPossibleTimeslice, ChannelIndex channel) {
689 auto& decongestion = services.get<DecongestionService>();
690 auto& relayer = services.get<DataRelayer>();
691 auto& timesliceIndex = services.get<TimesliceIndex>();
692 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, &decongestion);
693 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Received oldest possible timeframe %" PRIu64 " from channel %d",
694 (uint64_t)oldestPossibleTimeslice, channel.value);
695 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
696 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
697 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
698
699 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
700 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
701 return;
702 }
703 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
704 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
705 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
706 return;
707 }
708 auto& queue = services.get<AsyncQueue>();
709 const auto& state = services.get<DeviceState>();
712 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
713 (uint64_t)oldestPossibleOutput.timeslice.value);
715 queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
716 .id = decongestion.oldestPossibleTimesliceTask,
717 .debounce = -1, .callback = decongestionCallback}
718 .user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
719
720 if (decongestion.orderedCompletionPolicyActive) {
722 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
723 .callback = decongestionCallbackOrdered}
724 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
725 } },
726 .kind = ServiceKind::Serial};
727}
728
729// FIXME: allow configuring the default number of threads per device
730// This should probably be done by overriding the preFork
731// callback and using the boost program options there to
732// get the default number of threads.
734{
735 return ServiceSpec{
736 .name = "threadpool",
737 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
738 auto* pool = new ThreadPool();
739 // FIXME: this will require some extra argument for the configuration context of a service
740 pool->poolSize = 1;
741 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
742 },
743 .configure = [](InitContext&, void* service) -> void* {
744 auto* t = reinterpret_cast<ThreadPool*>(service);
745 // FIXME: this will require some extra argument for the configuration context of a service
746 t->poolSize = 1;
747 return service;
748 },
749 .postForkParent = [](ServiceRegistryRef services) -> void {
750 // FIXME: this will require some extra argument for the configuration context of a service
751 auto numWorkersS = std::to_string(1);
752 setenv("UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
753 },
754 .kind = ServiceKind::Serial};
755}
756
757namespace
758{
759auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
760{
761 // Update the timer to make sure we have the correct time when sending out the stats.
762 uv_update_time(registry.get<DeviceState>().loop);
763 // Derive the amount of shared memory used
764 auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
765 using namespace fair::mq::shmem;
766 auto& spec = registry.get<DeviceSpec const>();
767
768 // FIXME: Ugly, but we do it only every 5 seconds...
769 if (stats.hasAvailSHMMetric) {
770 auto device = registry.get<RawDeviceService>().device();
771 long freeMemory = -1;
772 try {
773 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
774 } catch (...) {
775 }
776 if (freeMemory == -1) {
777 try {
778 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
779 } catch (...) {
780 }
781 }
782 stats.updateStats({static_cast<unsigned short>(static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory});
783 }
784
785 auto device = registry.get<RawDeviceService>().device();
786
787 int64_t totalBytesIn = 0;
788 int64_t totalBytesOut = 0;
789
790 for (auto& channel : device->GetChannels()) {
791 totalBytesIn += channel.second[0].GetBytesRx();
792 totalBytesOut += channel.second[0].GetBytesTx();
793 }
794
795 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_IN), DataProcessingStats::Op::Set, totalBytesIn / 1000000});
796 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_OUT), DataProcessingStats::Op::Set, totalBytesOut / 1000000});
797
798 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_IN_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesIn / 1000000});
799 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
800};
801
802auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
803{
804 if (!registry.get<DriverConfig const>().driverHasGUI) {
805 return;
806 }
807 states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
808 auto& client = registry.get<ControlService>();
809 client.push(spec, value, timestamp);
810 });
811}
812
813O2_DECLARE_DYNAMIC_LOG(monitoring_service);
814
816auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
817{
818 // Flushing metrics should only happen on main thread to avoid
819 // having to have a mutex for the communication with the driver.
820 O2_SIGNPOST_ID_GENERATE(sid, monitoring_service);
821 O2_SIGNPOST_START(monitoring_service, sid, "flush", "flushing metrics");
822 if (registry.isMainThread() == false) {
823 LOGP(fatal, "Flushing metrics should only happen on the main thread.");
824 }
825 auto& monitoring = registry.get<Monitoring>();
826 auto& relayer = registry.get<DataRelayer>();
827
828 // Send all the relevant metrics for the relayer to update the GUI
829 stats.flushChangedMetrics([&monitoring](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) mutable -> void {
830 // convert timestamp to a time_point
831 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
832 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
833 if (spec.kind == DataProcessingStats::Kind::UInt64) {
834 if (value < 0) {
835 LOG(debug) << "Value for " << spec.name << " is negative, setting to 0";
836 value = 0;
837 }
838 metric.addValue((uint64_t)value, "value");
839 } else {
840 if (value > (int64_t)std::numeric_limits<int>::max()) {
841 LOG(warning) << "Value for " << spec.name << " is too large, setting to INT_MAX";
842 value = (int64_t)std::numeric_limits<int>::max();
843 }
844 if (value < (int64_t)std::numeric_limits<int>::min()) {
845 value = (int64_t)std::numeric_limits<int>::min();
846 LOG(warning) << "Value for " << spec.name << " is too small, setting to INT_MIN";
847 }
848 metric.addValue((int)value, "value");
849 }
850 if (spec.scope == DataProcessingStats::Scope::DPL) {
851 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
852 }
853 monitoring.send(std::move(metric));
854 });
855 relayer.sendContextState();
856 monitoring.flushBuffer();
857 O2_SIGNPOST_END(monitoring_service, sid, "flush", "done flushing metrics");
858};
859} // namespace
860
862{
863 return ServiceSpec{
864 .name = "data-processing-stats",
865 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
866 timespec now;
867 clock_gettime(CLOCK_REALTIME, &now);
868 uv_update_time(state.loop);
869 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
871 .minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
874 config);
875 auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
876
877 // It makes no sense to update the stats more often than every 5s
878 int quickUpdateInterval = 5000;
879 uint64_t quickRefreshInterval = 7000;
880 uint64_t onlineRefreshLatency = 60000; // For metrics which are reported online, we flush them every 60s regardless of their state.
881 using MetricSpec = DataProcessingStats::MetricSpec;
882 using Kind = DataProcessingStats::Kind;
883 using Scope = DataProcessingStats::Scope;
884
885#ifdef NDEBUG
886 bool enableDebugMetrics = false;
887#else
888 bool enableDebugMetrics = true;
889#endif
890 bool arrowAndResourceLimitingMetrics = false;
892 arrowAndResourceLimitingMetrics = true;
893 }
894
895 int64_t consumedTimeframesPublishInterval = 0;
897 consumedTimeframesPublishInterval = 5000;
898 }
899 // Input proxies should not report cpu_usage_fraction,
900 // because of the rate limiting which biases the measurement.
901 auto& spec = services.get<DeviceSpec const>();
902 bool enableCPUUsageFraction = true;
903 auto isProxy = [](DataProcessorLabel const& label) -> bool { return label == DataProcessorLabel{"input-proxy"}; };
904 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
905 O2_SIGNPOST_ID_GENERATE(mid, policies);
906 O2_SIGNPOST_EVENT_EMIT(policies, mid, "metrics", "Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
907 enableCPUUsageFraction = false;
908 }
909
910 std::vector<DataProcessingStats::MetricSpec> metrics = {
911 MetricSpec{.name = "errors",
913 .kind = Kind::UInt64,
914 .scope = Scope::Online,
915 .minPublishInterval = quickUpdateInterval,
916 .maxRefreshLatency = quickRefreshInterval},
917 MetricSpec{.name = "exceptions",
919 .kind = Kind::UInt64,
920 .scope = Scope::Online,
921 .minPublishInterval = quickUpdateInterval},
922 MetricSpec{.name = "inputs/relayed/pending",
924 .kind = Kind::UInt64,
925 .minPublishInterval = quickUpdateInterval},
926 MetricSpec{.name = "inputs/relayed/incomplete",
928 .kind = Kind::UInt64,
929 .minPublishInterval = quickUpdateInterval},
930 MetricSpec{.name = "inputs/relayed/total",
932 .kind = Kind::UInt64,
933 .minPublishInterval = quickUpdateInterval},
934 MetricSpec{.name = "elapsed_time_ms",
936 .kind = Kind::UInt64,
937 .minPublishInterval = quickUpdateInterval},
938 MetricSpec{.name = "total_wall_time_ms",
940 .kind = Kind::UInt64,
941 .minPublishInterval = quickUpdateInterval},
942 MetricSpec{.name = "last_processed_input_size_byte",
944 .kind = Kind::UInt64,
945 .minPublishInterval = quickUpdateInterval},
946 MetricSpec{.name = "total_processed_input_size_byte",
948 .kind = Kind::UInt64,
949 .scope = Scope::Online,
950 .minPublishInterval = quickUpdateInterval},
951 MetricSpec{.name = "total_sigusr1",
953 .kind = Kind::UInt64,
954 .minPublishInterval = quickUpdateInterval},
955 MetricSpec{.name = "consumed-timeframes",
957 .kind = Kind::UInt64,
958 .minPublishInterval = consumedTimeframesPublishInterval,
959 .maxRefreshLatency = quickRefreshInterval,
960 .sendInitialValue = true},
961 MetricSpec{.name = "min_input_latency_ms",
963 .kind = Kind::UInt64,
964 .scope = Scope::Online,
965 .minPublishInterval = quickUpdateInterval},
966 MetricSpec{.name = "max_input_latency_ms",
968 .kind = Kind::UInt64,
969 .minPublishInterval = quickUpdateInterval},
970 MetricSpec{.name = "total_rate_in_mb_s",
972 .kind = Kind::Rate,
973 .scope = Scope::Online,
974 .minPublishInterval = quickUpdateInterval,
975 .maxRefreshLatency = onlineRefreshLatency,
976 .sendInitialValue = true},
977 MetricSpec{.name = "total_rate_out_mb_s",
979 .kind = Kind::Rate,
980 .scope = Scope::Online,
981 .minPublishInterval = quickUpdateInterval,
982 .maxRefreshLatency = onlineRefreshLatency,
983 .sendInitialValue = true},
984 MetricSpec{.name = "processing_rate_hz",
986 .kind = Kind::Rate,
987 .scope = Scope::Online,
988 .minPublishInterval = quickUpdateInterval,
989 .maxRefreshLatency = onlineRefreshLatency,
990 .sendInitialValue = true},
991 MetricSpec{.name = "cpu_usage_fraction",
992 .enabled = enableCPUUsageFraction,
994 .kind = Kind::Rate,
995 .scope = Scope::Online,
996 .minPublishInterval = quickUpdateInterval,
997 .maxRefreshLatency = onlineRefreshLatency,
998 .sendInitialValue = true},
999 MetricSpec{.name = "performed_computations",
1001 .kind = Kind::UInt64,
1002 .scope = Scope::Online,
1003 .minPublishInterval = quickUpdateInterval,
1004 .maxRefreshLatency = onlineRefreshLatency,
1005 .sendInitialValue = true},
1006 MetricSpec{.name = "total_bytes_in",
1008 .kind = Kind::UInt64,
1009 .scope = Scope::Online,
1010 .minPublishInterval = quickUpdateInterval,
1011 .maxRefreshLatency = onlineRefreshLatency,
1012 .sendInitialValue = true},
1013 MetricSpec{.name = "total_bytes_out",
1015 .kind = Kind::UInt64,
1016 .scope = Scope::Online,
1017 .minPublishInterval = quickUpdateInterval,
1018 .maxRefreshLatency = onlineRefreshLatency,
1019 .sendInitialValue = true},
1020 MetricSpec{.name = fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId),
1021 .metricId = (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512),
1022 .kind = Kind::UInt64,
1023 .scope = Scope::Online,
1024 .minPublishInterval = 500,
1025 .maxRefreshLatency = onlineRefreshLatency,
1026 .sendInitialValue = true},
1027 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1028 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1029 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1030 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1031 MetricSpec{.name = "arrow-bytes-destroyed",
1032 .enabled = arrowAndResourceLimitingMetrics,
1033 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED),
1034 .kind = Kind::UInt64,
1035 .scope = Scope::DPL,
1036 .minPublishInterval = 0,
1037 .maxRefreshLatency = 10000,
1038 .sendInitialValue = true},
1039 MetricSpec{.name = "arrow-messages-destroyed",
1040 .enabled = arrowAndResourceLimitingMetrics,
1041 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED),
1042 .kind = Kind::UInt64,
1043 .scope = Scope::DPL,
1044 .minPublishInterval = 0,
1045 .maxRefreshLatency = 10000,
1046 .sendInitialValue = true},
1047 MetricSpec{.name = "arrow-bytes-created",
1048 .enabled = arrowAndResourceLimitingMetrics,
1049 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED),
1050 .kind = Kind::UInt64,
1051 .scope = Scope::DPL,
1052 .minPublishInterval = 0,
1053 .maxRefreshLatency = 10000,
1054 .sendInitialValue = true},
1055 MetricSpec{.name = "arrow-messages-created",
1056 .enabled = arrowAndResourceLimitingMetrics,
1057 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED),
1058 .kind = Kind::UInt64,
1059 .scope = Scope::DPL,
1060 .minPublishInterval = 0,
1061 .maxRefreshLatency = 10000,
1062 .sendInitialValue = true},
1063 MetricSpec{.name = "arrow-bytes-expired",
1064 .enabled = arrowAndResourceLimitingMetrics,
1065 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED),
1066 .kind = Kind::UInt64,
1067 .scope = Scope::DPL,
1068 .minPublishInterval = 0,
1069 .maxRefreshLatency = 10000,
1070 .sendInitialValue = true},
1071 MetricSpec{.name = "shm-offer-bytes-consumed",
1072 .enabled = arrowAndResourceLimitingMetrics,
1073 .metricId = static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED),
1074 .kind = Kind::UInt64,
1075 .scope = Scope::DPL,
1076 .minPublishInterval = 0,
1077 .maxRefreshLatency = 10000,
1078 .sendInitialValue = true},
1079 MetricSpec{.name = "resources-missing",
1080 .enabled = enableDebugMetrics,
1081 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
1082 .kind = Kind::UInt64,
1083 .scope = Scope::DPL,
1084 .minPublishInterval = 1000,
1085 .maxRefreshLatency = 1000,
1086 .sendInitialValue = true},
1087 MetricSpec{.name = "resources-insufficient",
1088 .enabled = enableDebugMetrics,
1089 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
1090 .kind = Kind::UInt64,
1091 .scope = Scope::DPL,
1092 .minPublishInterval = 1000,
1093 .maxRefreshLatency = 1000,
1094 .sendInitialValue = true},
1095 MetricSpec{.name = "resources-satisfactory",
1096 .enabled = enableDebugMetrics,
1097 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
1098 .kind = Kind::UInt64,
1099 .scope = Scope::DPL,
1100 .minPublishInterval = 1000,
1101 .maxRefreshLatency = 1000,
1102 .sendInitialValue = true},
1103 MetricSpec{.name = "resource-offer-expired",
1104 .enabled = arrowAndResourceLimitingMetrics,
1105 .metricId = static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED),
1106 .kind = Kind::UInt64,
1107 .scope = Scope::DPL,
1108 .minPublishInterval = 0,
1109 .maxRefreshLatency = 10000,
1110 .sendInitialValue = true}};
1111
1112 for (auto& metric : metrics) {
1113 if (metric.metricId == (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512)) {
1114 if (spec.name.compare("readout-proxy") == 0) {
1115 stats->hasAvailSHMMetric = true;
1116 } else {
1117 continue;
1118 }
1119 }
1120 stats->registerMetric(metric);
1121 }
1122
1123 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1124 },
1125 .configure = noConfiguration(),
1126 .postProcessing = [](ProcessingContext& context, void* service) {
1127 auto* stats = (DataProcessingStats*)service;
1129 .preDangling = [](DanglingContext& context, void* service) {
1130 auto* stats = (DataProcessingStats*)service;
1131 sendRelayerMetrics(context.services(), *stats);
1132 flushMetrics(context.services(), *stats); },
1133 .postDangling = [](DanglingContext& context, void* service) {
1134 auto* stats = (DataProcessingStats*)service;
1135 sendRelayerMetrics(context.services(), *stats);
1136 flushMetrics(context.services(), *stats); },
1137 .preEOS = [](EndOfStreamContext& context, void* service) {
1138 auto* stats = (DataProcessingStats*)service;
1139 sendRelayerMetrics(context.services(), *stats);
1140 flushMetrics(context.services(), *stats); },
1141 .preLoop = [](ServiceRegistryRef ref, void* service) {
1142 auto* stats = (DataProcessingStats*)service;
1143 flushMetrics(ref, *stats); },
1144 .kind = ServiceKind::Serial};
1145}
1146
1147// This is similar to the dataProcessingStats, but it designed to synchronize
1148// history-less metrics which are e.g. used for the GUI.
1150{
1151 return ServiceSpec{
1152 .name = "data-processing-states",
1153 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
1154 timespec now;
1155 clock_gettime(CLOCK_REALTIME, &now);
1156 uv_update_time(state.loop);
1157 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
1160 states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
1161 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
1162 },
1163 .configure = noConfiguration(),
1164 .postProcessing = [](ProcessingContext& context, void* service) {
1165 auto* states = (DataProcessingStates*)service;
1166 states->processCommandQueue(); },
1167 .preDangling = [](DanglingContext& context, void* service) {
1168 auto* states = (DataProcessingStates*)service;
1169 flushStates(context.services(), *states); },
1170 .postDangling = [](DanglingContext& context, void* service) {
1171 auto* states = (DataProcessingStates*)service;
1172 flushStates(context.services(), *states); },
1173 .preEOS = [](EndOfStreamContext& context, void* service) {
1174 auto* states = (DataProcessingStates*)service;
1175 flushStates(context.services(), *states); },
1176 .kind = ServiceKind::Global};
1177}
1178
1180};
1181
1183{
1184 return ServiceSpec{
1185 .name = "gui-metrics",
1186 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
1187 auto* stats = new GUIMetrics();
1188 auto& monitoring = services.get<Monitoring>();
1189 auto& spec = services.get<DeviceSpec const>();
1190 monitoring.send({(int)spec.inputChannels.size(), fmt::format("oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1191 monitoring.send({(int)1, fmt::format("oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1192 monitoring.send({(int)spec.outputChannels.size(), fmt::format("oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1193 monitoring.send({(int)1, fmt::format("oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1194 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1195 },
1196 .configure = noConfiguration(),
1197 .postProcessing = [](ProcessingContext& context, void* service) {
1198 auto& relayer = context.services().get<DataRelayer>();
1199 auto& monitoring = context.services().get<Monitoring>();
1200 auto& spec = context.services().get<DeviceSpec const>();
1201 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1202 for (size_t ci; ci < spec.outputChannels.size(); ++ci) {
1203 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format("oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1204 } },
1205 .domainInfoUpdated = [](ServiceRegistryRef registry, size_t timeslice, ChannelIndex channel) {
1206 auto& monitoring = registry.get<Monitoring>();
1207 monitoring.send({(uint64_t)timeslice, fmt::format("oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1208 .active = false,
1209 .kind = ServiceKind::Serial};
1210}
1211
1213{
1214 return ServiceSpec{
1215 .name = "object-cache",
1216 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1217 auto* cache = new ObjectCache();
1218 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1219 },
1220 .configure = noConfiguration(),
1222}
1223
1225{
1226 return ServiceSpec{
1227 .name = "data-processing-context",
1228 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1229 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessorContext>(), new DataProcessorContext()};
1230 },
1231 .configure = noConfiguration(),
1232 .exit = [](ServiceRegistryRef, void* service) { auto* context = (DataProcessorContext*)service; delete context; },
1233 .kind = ServiceKind::Serial};
1234}
1235
1237{
1238 return ServiceSpec{
1239 .name = "device-context",
1240 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1241 return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
1242 },
1243 .configure = noConfiguration(),
1244 .kind = ServiceKind::Serial};
1245}
1246
1248{
1249 return ServiceSpec{
1250 .name = "data-allocator",
1251 .uniqueId = simpleServiceId<DataAllocator>(),
1252 .init = [](ServiceRegistryRef ref, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1253 return ServiceHandle{
1254 .hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1255 .instance = new DataAllocator(ref),
1256 .kind = ServiceKind::Stream,
1257 .name = "data-allocator",
1258 };
1259 },
1260 .configure = noConfiguration(),
1261 .kind = ServiceKind::Stream};
1262}
1263
1265std::vector<ServiceSpec> CommonServices::defaultServices(std::string extraPlugins, int numThreads)
1266{
1267 std::vector<ServiceSpec> specs{
1271 asyncQueue(),
1278 controlSpec(),
1279 rootFileSpec(),
1280 parallelSpec(),
1281 callbacksSpec(),
1284 dataRelayer(),
1286 dataSender(),
1287 objectCache(),
1288 ccdbSupportSpec()};
1289
1291 specs.push_back(ArrowSupport::arrowBackendSpec());
1292 }
1295 specs.push_back(decongestionSpec());
1296
1297 std::string loadableServicesStr = extraPlugins;
1298 // Do not load InfoLogger by default if we are not at P2.
1300 if (loadableServicesStr.empty() == false) {
1301 loadableServicesStr += ",";
1302 }
1303 loadableServicesStr += "O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1304 }
1305 // Load plugins depending on the environment
1306 std::vector<LoadablePlugin> loadablePlugins = {};
1307 char* loadableServicesEnv = getenv("DPL_LOAD_SERVICES");
1308 // String to define the services to load is:
1309 //
1310 // library1:name1,library2:name2,...
1311 if (loadableServicesEnv) {
1312 if (loadableServicesStr.empty() == false) {
1313 loadableServicesStr += ",";
1314 }
1315 loadableServicesStr += loadableServicesEnv;
1316 }
1317 loadablePlugins = PluginManager::parsePluginSpecString(loadableServicesStr.c_str());
1318 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
1319 // I should make it optional depending wether the GUI is there or not...
1320 specs.push_back(CommonServices::guiMetricsSpec());
1321 if (numThreads) {
1322 specs.push_back(threadPool(numThreads));
1323 }
1324 return specs;
1325}
1326
1327std::vector<ServiceSpec> CommonServices::arrowServices()
1328{
1329 return {
1332 };
1333}
1334
1335} // namespace o2::framework
benchmark::State & state
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
int16_t time
Definition RawEventData.h:4
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
Definition Signpost.h:556
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:546
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
Definition DataSender.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint id
Definition glcorearb.h:650
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ DPL
The channel is a normal input channel.
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
@ NoTransition
No pending transitions.
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
Definition AsyncQueue.h:32
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec controlSpec()
static ServiceSpec deviceContextSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
Running state information of a given device.
Definition DeviceState.h:34
static size_t maxLanes(std::vector< InputRoute > const &routes)
header::DataOrigin origin
Definition Output.h:28
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"