25#include "Framework/Signpost.h"
32#include "InputRouteHelpers.h"
36#include "Framework/Tracing.h"
46#include "Framework/Signpost.h"
48#include "TextDriverClient.h"
49#include "WSDriverClient.h"
50#include "HTTPParser.h"
51#include "../src/DataProcessingStatus.h"
52#include "DecongestionService.h"
53#include "ArrowSupport.h"
55#include "Headers/STFHeader.h"
56#include "Headers/DataHeader.h"
58#include <Configuration/ConfigurationInterface.h>
59#include <Configuration/ConfigurationFactory.h>
60#include <Monitoring/MonitoringFactory.h>
61#include "Framework/Signpost.h"
63#include <fairmq/Device.h>
64#include <fairmq/shmem/Monitor.h>
65#include <fairmq/shmem/Common.h>
66#include <fairmq/ProgOptions.h>
67#include <uv.h>
69#include <cstdlib>
70#include <cstring>
72using o2::configuration::ConfigurationFactory;
73using o2::configuration::ConfigurationInterface;
74using o2::monitoring::Monitoring;
75using o2::monitoring::MonitoringFactory;
76using Metric = o2::monitoring::Metric;
77using Key = o2::monitoring::tags::Key;
78using Value = o2::monitoring::tags::Value;
85namespace o2::framework
91 return ServiceSpec{
92 .name = "monitoring",
93 .init = [](ServiceRegistryRef registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
94 void* service = nullptr;
95 bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
96 bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
97 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
98 o2::monitoring::Monitoring* monitoring;
99 if (useDPL) {
100 monitoring = new Monitoring();
101 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
102 (dynamic_cast<o2::monitoring::Backend*>(dplBackend.get()))->setVerbosity(o2::monitoring::Verbosity::Debug);
103 monitoring->addBackend(std::move(dplBackend));
104 } else {
105 auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
106 monitoring = MonitoringFactory::Get(backend).release();
107 }
108 service = monitoring;
109 monitoring->enableBuffering(MONITORING_QUEUE_SIZE);
110 assert(registry.get<DeviceSpec const>().name.empty() == false);
111 monitoring->addGlobalTag("pipeline_id", std::to_string(registry.get<DeviceSpec const>().inputTimesliceId));
112 monitoring->addGlobalTag("dataprocessor_name", registry.get<DeviceSpec const>().name);
113 monitoring->addGlobalTag("dpl_instance", options.GetPropertyAsString("shm-segment-id"));
114 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
115 },
116 .configure = noConfiguration(),
117 .start = [](ServiceRegistryRef services, void* service) {
118 auto* monitoring = (o2::monitoring::Monitoring*)service;
120 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
121 if (extRunNumber == "unspecified") {
122 return;
123 }
124 try {
125 monitoring->setRunNumber(std::stoul(extRunNumber));
126 } catch (...) {
127 } },
128 .exit = [](ServiceRegistryRef registry, void* service) {
129 auto* monitoring = reinterpret_cast<Monitoring*>(service);
130 monitoring->flushBuffer();
131 delete monitoring; },
132 .kind = ServiceKind::Serial};
135// An asyncronous service that executes actions in at the end of the data processing
138 return ServiceSpec{
139 .name = "async-queue",
140 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
141 .configure = noConfiguration(),
142 .stop = [](ServiceRegistryRef services, void* service) {
143 auto& queue = services.get<AsyncQueue>();
145 },
146 .kind = ServiceKind::Serial};
149// Make it a service so that it can be used easily from the analysis
150// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
151// basis when we get to it.
154 return ServiceSpec{
155 .name = "timing-info",
156 .uniqueId = simpleServiceId<TimingInfo>(),
157 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
158 .configure = noConfiguration(),
159 .kind = ServiceKind::Stream};
164 return ServiceSpec{
165 .name = "stream-context",
166 .uniqueId = simpleServiceId<StreamContext>(),
167 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
168 .configure = noConfiguration(),
169 .preProcessing = [](ProcessingContext& context, void* service) {
170 auto* stream = (StreamContext*)service;
171 auto& routes =<DeviceSpec const>().outputs;
172 // Notice I need to do this here, because different invocation for
173 // the same stream might be referring to different data processors.
174 // We should probably have a context which is per stream of a specific
175 // data processor.
176 stream->routeDPLCreated.resize(routes.size());
177 stream->routeCreated.resize(routes.size());
178 // Reset the routeDPLCreated at every processing step
179 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false);
180 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
181 .postProcessing = [](ProcessingContext& processingContext, void* service) {
182 auto* stream = (StreamContext*)service;
183 auto& routes =<DeviceSpec const>().outputs;
184 auto& timeslice =<TimingInfo>().timeslice;
185 auto& messageContext =<MessageContext>();
186 // Check if we never created any data for this timeslice
187 // if we did not, but we still have didDispatched set to true
188 // it means it was created out of band.
189 bool userDidCreate = false;
190 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
191 for (size_t ri = 0; ri < routes.size(); ++ri) {
192 if (stream->routeCreated[ri] == true && stream->routeDPLCreated[ri] == false) {
193 userDidCreate = true;
194 break;
195 }
196 }
197 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "userDidCreate == %d && didDispatch == %d",
198 userDidCreate,
199 messageContext.didDispatch());
200 if (userDidCreate == false && messageContext.didDispatch() == true) {
201 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
202 userDidCreate,
203 messageContext.didDispatch());
204 return;
205 }
206 if (userDidCreate == false && messageContext.didDispatch() == false) {
207 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
208 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "No data created.");
209 return;
210 }
211 for (size_t ri = 0; ri < routes.size(); ++ri) {
212 auto &route = routes[ri];
213 auto &matcher = route.matcher;
214 if (stream->routeDPLCreated[ri] == true) {
215 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created by DPL. ri = %" PRIu64 ", %{public}s",
216 (uint64_t)ri, DataSpecUtils::describe(matcher).c_str());
217 continue;
218 }
219 if (stream->routeCreated[ri] == true) {
220 continue;
221 } if ((timeslice % route.maxTimeslices) != route.timeslice) {
222 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Route ri = %" PRIu64 ", skipped because of pipelining.",
223 (uint64_t)ri);
224 continue;
225 }
226 if (matcher.lifetime == Lifetime::Timeframe) {
227 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks",
228 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64 " and might result in dropped timeframes",
229 DataSpecUtils::describe(matcher).c_str(), (uint64_t)timeslice);
230 LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
231 }
232 } },
233 .preEOS = [](EndOfStreamContext& context, void* service) {
234 // We need to reset the routeDPLCreated / routeCreated because the end of stream
235 // uses a different context which does not know about the routes.
236 // FIXME: This should be fixed in a different way, but for now it will
237 // allow TPC IDC to work.
238 auto* stream = (StreamContext*)service;
239 auto& routes =<DeviceSpec const>().outputs;
240 // Notice I need to do this here, because different invocation for
241 // the same stream might be referring to different data processors.
242 // We should probably have a context which is per stream of a specific
243 // data processor.
244 stream->routeDPLCreated.resize(routes.size());
245 stream->routeCreated.resize(routes.size());
246 // Reset the routeCreated / routeDPLCreated at every processing step
247 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false);
248 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false); },
249 .kind = ServiceKind::Stream};
254 return ServiceSpec{
255 .name = "datataking-contex",
256 .uniqueId = simpleServiceId<DataTakingContext>(),
257 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
258 .configure = noConfiguration(),
259 .preProcessing = [](ProcessingContext& processingContext, void* service) {
260 auto& context =<DataTakingContext>();
261 for (auto const& ref : processingContext.inputs()) {
262 const o2::framework::DataProcessingHeader *dph = o2::header::get<DataProcessingHeader*>(ref.header);
263 const auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
264 if (!dph || !dh) {
265 continue;
266 }
267 context.runNumber = fmt::format("{}", dh->runNumber);
268 break;
269 } },
270 // Notice this will be executed only once, because the service is declared upfront.
271 .start = [](ServiceRegistryRef services, void* service) {
272 auto& context = services.get<DataTakingContext>();
276 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
277 if (extRunNumber != "unspecified" || context.runNumber == "0") {
278 context.runNumber = extRunNumber;
279 }
280 auto extLHCPeriod = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhc_period", "unspecified");
281 if (extLHCPeriod != "unspecified") {
282 context.lhcPeriod = extLHCPeriod;
283 } else {
284 static const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
285 time_t now = time(nullptr);
286 auto ltm = gmtime(&now);
287 context.lhcPeriod = months[ltm->tm_mon];
288 LOG(info) << "LHCPeriod is not available, using current month " << context.lhcPeriod;
289 }
291 auto extRunType = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("run_type", "unspecified");
292 if (extRunType != "unspecified") {
293 context.runType = extRunType;
294 }
295 auto extEnvId = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", "unspecified");
296 if (extEnvId != "unspecified") {
297 context.envId = extEnvId;
298 }
299 auto extDetectors = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("detectors", "unspecified");
300 if (extDetectors != "unspecified") {
301 context.detectors = extDetectors;
302 }
303 auto forcedRaw = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("force_run_as_raw", "false");
304 context.forcedRaw = forcedRaw == "true"; },
305 .kind = ServiceKind::Stream};
313 return ServiceSpec{
314 .name = "configuration",
315 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
316 auto backend = options.GetPropertyAsString("configuration");
317 if (backend == "command-line") {
318 return ServiceHandle{0, nullptr};
319 }
320 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
321 ConfigurationFactory::getConfiguration(backend).release()};
322 },
323 .configure = noConfiguration(),
324 .driverStartup = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
325 if (dc.options.count("configuration") == 0) {
326 registry.registerService(ServiceHandle{0, nullptr});
327 return;
328 }
329 auto backend = dc.options["configuration"].as<std::string>();
330 registry.registerService(ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
331 ConfigurationFactory::getConfiguration(backend).release()}); },
332 .kind = ServiceKind::Global};
337 return ServiceSpec{
338 .name = "driverClient",
339 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
340 auto backend = options.GetPropertyAsString("driver-client-backend");
341 if (backend == "stdout://") {
342 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
343 new TextDriverClient(services, state)};
344 }
345 auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
346 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
347 new WSDriverClient(services, ip.c_str(), port)};
348 },
349 .configure = noConfiguration(),
350 .kind = ServiceKind::Global};
355 return ServiceSpec{
356 .name = "control",
357 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
358 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
359 new ControlService(services, state)};
360 },
361 .configure = noConfiguration(),
362 .kind = ServiceKind::Serial};
367 return ServiceSpec{
368 .name = "localrootfile",
369 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
370 .configure = noConfiguration(),
371 .kind = ServiceKind::Serial};
376 return ServiceSpec{
377 .name = "parallel",
378 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
379 auto& spec = services.get<DeviceSpec const>();
380 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
381 new ParallelContext(spec.rank, spec.nSlots)};
382 },
383 .configure = noConfiguration(),
384 .kind = ServiceKind::Serial};
389 return ServiceSpec{
390 .name = "timesliceindex",
391 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
392 auto& spec = services.get<DeviceSpec const>();
393 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
394 new TimesliceIndex(InputRouteHelpers::maxLanes(spec.inputs), state.inputChannelInfos)};
395 },
396 .configure = noConfiguration(),
397 .kind = ServiceKind::Serial};
402 return ServiceSpec{
403 .name = "callbacks",
404 .init = simpleServiceInit<CallbackService, CallbackService>(),
405 .configure = noConfiguration(),
406 .kind = ServiceKind::Serial};
411 return ServiceSpec{
412 .name = "datarelayer",
413 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
414 auto& spec = services.get<DeviceSpec const>();
415 return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
416 new DataRelayer(spec.completionPolicy,
417 spec.inputs,
418 services.get<TimesliceIndex>(),
419 services)};
420 },
421 .configure = noConfiguration(),
422 .kind = ServiceKind::Serial};
427 return ServiceSpec{
428 .name = "datasender",
429 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
430 return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
431 new DataSender(services)};
432 },
433 .configure = noConfiguration(),
434 .preProcessing = [](ProcessingContext&, void* service) {
435 auto& dataSender = *reinterpret_cast<DataSender*>(service);
436 dataSender.reset(); },
437 .postDispatching = [](ProcessingContext& ctx, void* service) {
438 auto& dataSender = *reinterpret_cast<DataSender*>(service);
439 // If the quit was requested, the post dispatching can still happen
440 // but with an empty set of data.
441 if (<DeviceState>().quitRequested == false) {
442 dataSender.verifyMissingSporadic();
443 } },
453 return ServiceSpec{
454 .name = "tracing",
455 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
456 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<TracingInfrastructure>(),
457 .instance = new TracingInfrastructure(),
458 .kind = ServiceKind::Serial};
459 },
460 .configure = noConfiguration(),
461 .preProcessing = [](ProcessingContext&, void* service) {
462 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
463 t->processingCount += 1; },
464 .postProcessing = [](ProcessingContext&, void* service) {
465 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
466 t->processingCount += 1; },
467 .kind = ServiceKind::Serial};
473// CCDB Support service
476 return ServiceSpec{
477 .name = "ccdb-support",
478 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
479 // iterate on all the outputs matchers
480 auto& spec = services.get<DeviceSpec const>();
481 for (auto& output : spec.outputs) {
482 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
483 LOGP(debug, "Optional inputs support enabled");
484 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = new CCDBSupport, .kind = ServiceKind::Serial};
485 }
486 }
487 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = nullptr, .kind = ServiceKind::Serial};
488 },
489 .configure = noConfiguration(),
490 .finaliseOutputs = [](ProcessingContext& pc, void* service) {
491 if (!service) {
492 return;
493 }
494 if (pc.outputs().countDeviceOutputs(true) == 0) {
495 LOGP(debug, "We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
496 return;
497 }
498 auto& timingInfo =<TimingInfo>();
500 // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
501 // we create a new message.
502 InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
503 auto& streamContext =<StreamContext>();
504 for (size_t oi = 0; oi <<DeviceSpec const>().outputs.size(); ++oi) {
505 OutputRoute const& output =<DeviceSpec const>().outputs[oi];
506 if ((output.timeslice % output.maxTimeslices) != 0) {
507 continue;
508 }
509 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
510 auto concrete = DataSpecUtils::asConcreteDataMatcher(output.matcher);
511 if (concrete.subSpec == 0) {
512 continue;
513 }
514 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(Output{concrete.origin, concrete.description, concrete.subSpec});
515 = timingInfo.timeslice;
516 stfDist.firstOrbit = timingInfo.firstTForbit;
517 stfDist.runNumber = timingInfo.runNumber;
518 // We mark it as not created, because we do should not account for it when
519 // checking if we created all the data for a timeslice.
520 O2_SIGNPOST_ID_FROM_POINTER(sid, stream_context, &streamContext);
521 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "finaliseOutputs", "Route %" PRIu64 " (%{public}s) was created by DPL.", (uint64_t)oi,
522 DataSpecUtils::describe(output.matcher).c_str());
523 streamContext.routeDPLCreated[oi] = true;
524 }
525 } },
526 .kind = ServiceKind::Global};
533auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
534 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
535 auto& ref = task.user<DecongestionContext>().ref;
537 auto& decongestion = ref.get<DecongestionService>();
538 auto& proxy = ref.get<FairMQDeviceProxy>();
540 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
541 cid.value = id;
542 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
543 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Not sending already sent value: %" PRIu64 "> %" PRIu64,
544 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
545 return;
546 }
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Running oldest possible timeslice %" PRIu64 " propagation.",
548 (uint64_t)oldestPossibleOutput.timeslice.value);
549 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
551 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
552 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
553 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
554 // TODO: this we could cache in the proxy at the bind moment.
555 if (info.channelType != ChannelAccountingType::DPL) {
556 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Skipping channel %{public}s",;
557 continue;
558 }
559 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value)) {
560 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
561 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
562, (uint64_t)oldestPossibleOutput.timeslice.value, 20);
563 }
564 }
565 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
568auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
569 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
570 auto& ref = task.user<DecongestionContext>().ref;
572 auto& decongestion = ref.get<DecongestionService>();
573 auto& state = ref.get<DeviceState>();
574 auto& timesliceIndex = ref.get<TimesliceIndex>();
575 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
576 int64_t oldNextTimeslice = decongestion.nextTimeslice;
577 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
578 if (oldNextTimeslice != decongestion.nextTimeslice) {
579 if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
580 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
581 } else {
582 O2_SIGNPOST_EVENT_EMIT_ERROR(async_queue, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
583 }
584 timesliceIndex.rescan();
585 }
588// Decongestion service
589// If we do not have any Timeframe input, it means we must be creating timeslices
590// in order and that we should propagate the oldest possible timeslice at the end
591// of each processing step.
595 return ServiceSpec{
596 .name = "decongestion",
597 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
598 auto* decongestion = new DecongestionService();
599 for (auto& input : services.get<DeviceSpec const>().inputs) {
600 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
601 LOGP(detail, "Found a real data input, we cannot update the oldest possible timeslice when sending messages");
602 decongestion->isFirstInTopology = false;
603 break;
604 }
605 }
606 auto& queue = services.get<AsyncQueue>();
607 decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
608 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
609 },
610 .postForwarding = [](ProcessingContext& ctx, void* service) {
611 auto* decongestion = reinterpret_cast<DecongestionService*>(service);
612 if (O2_BUILTIN_LIKELY(decongestion->isFirstInTopology == false)) {
613 return;
614 }
615 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, service);
616 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "postForwardingCallbacks", "We are the first one in the topology, we need to update the oldest possible timeslice");
617 auto& timesliceIndex =<TimesliceIndex>();
618 auto& relayer =<DataRelayer>();
619 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
620 auto& proxy =<FairMQDeviceProxy>();
621 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
622 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
623 LOGP(detail, "Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
624 return;
625 }
627 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
628 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
629 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
630 (uint64_t)oldestPossibleOutput.timeslice.value);
631 return;
632 }
633 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
634 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
635 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
636 return;
637 }
639 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Broadcasting oldest posssible output %" PRIu64 " due to %{public}s (%" PRIu64 ")",
640 (uint64_t)oldestPossibleOutput.timeslice.value,
641 oldestPossibleOutput.slot.index == -1 ? "channel" : "slot",
642 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? : oldestPossibleOutput.slot.index));
643 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Ordered active %d", decongestion->orderedCompletionPolicyActive);
644 if (decongestion->orderedCompletionPolicyActive) {
645 auto oldNextTimeslice = decongestion->nextTimeslice;
646 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
647 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Next timeslice %" PRIi64, decongestion->nextTimeslice);
648 if (oldNextTimeslice != decongestion->nextTimeslice) {
649 auto& state =<DeviceState>();
651 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
652 } else {
653 O2_SIGNPOST_EVENT_EMIT_ERROR(data_processor_context, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
654 }
655 timesliceIndex.rescan();
656 }
657 }
658 DataProcessingHelpers::broadcastOldestPossibleTimeslice(, oldestPossibleOutput.timeslice.value);
660 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
661 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
662 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
663 // TODO: this we could cache in the proxy at the bind moment.
664 if (info.channelType != ChannelAccountingType::DPL) {
665 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Skipping channel %{public}s",;
666 continue;
667 }
668 if (DataProcessingHelpers::sendOldestPossibleTimeframe(, info, state, oldestPossibleOutput.timeslice.value)) {
669 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
670 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
671, (uint64_t)oldestPossibleOutput.timeslice.value, 20);
672 }
673 }
674 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
675 .stop = [](ServiceRegistryRef services, void* service) {
676 auto* decongestion = (DecongestionService*)service;
677 services.get<TimesliceIndex>().reset();
678 decongestion->nextEnumerationTimeslice = 0;
679 decongestion->nextEnumerationTimesliceRewinded = false;
680 decongestion->lastTimeslice = 0;
681 decongestion->nextTimeslice = 0;
682 decongestion->oldestPossibleTimesliceTask = {0};
683 auto &state = services.get<DeviceState>();
684 for (auto &channel : state.inputChannelInfos) {
685 channel.oldestForChannel = {0};
686 } },
687 .domainInfoUpdated = [](ServiceRegistryRef services, size_t oldestPossibleTimeslice, ChannelIndex channel) {
688 auto& decongestion = services.get<DecongestionService>();
689 auto& relayer = services.get<DataRelayer>();
690 auto& timesliceIndex = services.get<TimesliceIndex>();
691 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, &decongestion);
692 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Received oldest possible timeframe %" PRIu64 " from channel %d",
693 (uint64_t)oldestPossibleTimeslice, channel.value);
694 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
695 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
696 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
698 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
699 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
700 return;
701 }
702 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
703 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
704 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
705 return;
706 }
707 auto& queue = services.get<AsyncQueue>();
708 const auto& state = services.get<DeviceState>();
711 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
712 (uint64_t)oldestPossibleOutput.timeslice.value);
714 queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
715 .id = decongestion.oldestPossibleTimesliceTask,
716 .debounce = -1, .callback = decongestionCallback}
717 .user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
719 if (decongestion.orderedCompletionPolicyActive) {
721 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
722 .callback = decongestionCallbackOrdered}
723 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
724 } },
725 .kind = ServiceKind::Serial};
728// FIXME: allow configuring the default number of threads per device
729// This should probably be done by overriding the preFork
730// callback and using the boost program options there to
731// get the default number of threads.
734 return ServiceSpec{
735 .name = "threadpool",
736 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
737 auto* pool = new ThreadPool();
738 // FIXME: this will require some extra argument for the configuration context of a service
739 pool->poolSize = 1;
740 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
741 },
742 .configure = [](InitContext&, void* service) -> void* {
743 auto* t = reinterpret_cast<ThreadPool*>(service);
744 // FIXME: this will require some extra argument for the configuration context of a service
745 t->poolSize = 1;
746 return service;
747 },
748 .postForkParent = [](ServiceRegistryRef services) -> void {
749 // FIXME: this will require some extra argument for the configuration context of a service
750 auto numWorkersS = std::to_string(1);
751 setenv("UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
752 },
753 .kind = ServiceKind::Serial};
758auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
760 // Update the timer to make sure we have the correct time when sending out the stats.
761 uv_update_time(registry.get<DeviceState>().loop);
762 // Derive the amount of shared memory used
763 auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
764 using namespace fair::mq::shmem;
765 auto& spec = registry.get<DeviceSpec const>();
767 auto hasMetric = [&runningWorkflow](const DataProcessingStats::MetricSpec& metric) -> bool {
768 return metric.metricId == static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512);
769 };
770 // FIXME: Ugly, but we do it only every 5 seconds...
771 if (std::find_if(stats.metricSpecs.begin(), stats.metricSpecs.end(), hasMetric) != stats.metricSpecs.end()) {
772 auto device = registry.get<RawDeviceService>().device();
773 long freeMemory = -1;
774 try {
775 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
776 } catch (...) {
777 }
778 if (freeMemory == -1) {
779 try {
780 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
781 } catch (...) {
782 }
783 }
784 stats.updateStats({static_cast<unsigned short>(static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory});
785 }
787 auto device = registry.get<RawDeviceService>().device();
789 int64_t totalBytesIn = 0;
790 int64_t totalBytesOut = 0;
792 for (auto& channel : device->GetChannels()) {
793 totalBytesIn += channel.second[0].GetBytesRx();
794 totalBytesOut += channel.second[0].GetBytesTx();
795 }
797 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_IN), DataProcessingStats::Op::Set, totalBytesIn / 1000000});
798 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_OUT), DataProcessingStats::Op::Set, totalBytesOut / 1000000});
800 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_IN_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesIn / 1000000});
801 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
804auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
806 states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
807 auto& client = registry.get<ControlService>();
808 client.push(spec, value, timestamp);
809 });
815auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
817 // Flushing metrics should only happen on main thread to avoid
818 // having to have a mutex for the communication with the driver.
819 O2_SIGNPOST_ID_GENERATE(sid, monitoring_service);
820 O2_SIGNPOST_START(monitoring_service, sid, "flush", "flushing metrics");
821 if (registry.isMainThread() == false) {
822 LOGP(fatal, "Flushing metrics should only happen on the main thread.");
823 }
824 auto& monitoring = registry.get<Monitoring>();
825 auto& relayer = registry.get<DataRelayer>();
827 // Send all the relevant metrics for the relayer to update the GUI
828 stats.flushChangedMetrics([&monitoring](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) mutable -> void {
829 // convert timestamp to a time_point
830 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
831 auto metric = o2::monitoring::Metric{, Metric::DefaultVerbosity, tp};
832 if (spec.kind == DataProcessingStats::Kind::UInt64) {
833 if (value < 0) {
834 LOG(debug) << "Value for " << << " is negative, setting to 0";
835 value = 0;
836 }
837 metric.addValue((uint64_t)value, "value");
838 } else {
839 if (value > (int64_t)std::numeric_limits<int>::max()) {
840 LOG(warning) << "Value for " << << " is too large, setting to INT_MAX";
841 value = (int64_t)std::numeric_limits<int>::max();
842 }
843 if (value < (int64_t)std::numeric_limits<int>::min()) {
844 value = (int64_t)std::numeric_limits<int>::min();
845 LOG(warning) << "Value for " << << " is too small, setting to INT_MIN";
846 }
847 metric.addValue((int)value, "value");
848 }
849 if (spec.scope == DataProcessingStats::Scope::DPL) {
850 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
851 }
852 monitoring.send(std::move(metric));
853 });
854 relayer.sendContextState();
855 monitoring.flushBuffer();
856 O2_SIGNPOST_END(monitoring_service, sid, "flush", "done flushing metrics");
858} // namespace
862 return ServiceSpec{
863 .name = "data-processing-stats",
864 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
865 timespec now;
866 clock_gettime(CLOCK_REALTIME, &now);
867 uv_update_time(state.loop);
868 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
870 .minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
873 config);
874 auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
876 // It makes no sense to update the stats more often than every 5s
877 int quickUpdateInterval = 5000;
878 uint64_t quickRefreshInterval = 7000;
879 uint64_t onlineRefreshLatency = 60000; // For metrics which are reported online, we flush them every 60s regardless of their state.
880 using MetricSpec = DataProcessingStats::MetricSpec;
881 using Kind = DataProcessingStats::Kind;
882 using Scope = DataProcessingStats::Scope;
884#ifdef NDEBUG
885 bool enableDebugMetrics = false;
887 bool enableDebugMetrics = true;
889 bool arrowAndResourceLimitingMetrics = false;
891 arrowAndResourceLimitingMetrics = true;
892 }
893 // Input proxies should not report cpu_usage_fraction,
894 // because of the rate limiting which biases the measurement.
895 auto& spec = services.get<DeviceSpec const>();
896 bool enableCPUUsageFraction = true;
897 auto isProxy = [](DataProcessorLabel const& label) -> bool { return label == DataProcessorLabel{"input-proxy"}; };
898 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
899 O2_SIGNPOST_ID_GENERATE(mid, policies);
900 O2_SIGNPOST_EVENT_EMIT(policies, mid, "metrics", "Disabling cpu_usage_fraction metric for proxy %{public}s",;
901 enableCPUUsageFraction = false;
902 }
904 std::vector<DataProcessingStats::MetricSpec> metrics = {
905 MetricSpec{.name = "errors",
907 .kind = Kind::UInt64,
908 .scope = Scope::Online,
909 .minPublishInterval = quickUpdateInterval,
910 .maxRefreshLatency = quickRefreshInterval},
911 MetricSpec{.name = "exceptions",
913 .kind = Kind::UInt64,
914 .scope = Scope::Online,
915 .minPublishInterval = quickUpdateInterval},
916 MetricSpec{.name = "inputs/relayed/pending",
918 .kind = Kind::UInt64,
919 .minPublishInterval = quickUpdateInterval},
920 MetricSpec{.name = "inputs/relayed/incomplete",
922 .kind = Kind::UInt64,
923 .minPublishInterval = quickUpdateInterval},
924 MetricSpec{.name = "inputs/relayed/total",
926 .kind = Kind::UInt64,
927 .minPublishInterval = quickUpdateInterval},
928 MetricSpec{.name = "elapsed_time_ms",
930 .kind = Kind::UInt64,
931 .minPublishInterval = quickUpdateInterval},
932 MetricSpec{.name = "total_wall_time_ms",
934 .kind = Kind::UInt64,
935 .minPublishInterval = quickUpdateInterval},
936 MetricSpec{.name = "last_processed_input_size_byte",
938 .kind = Kind::UInt64,
939 .minPublishInterval = quickUpdateInterval},
940 MetricSpec{.name = "total_processed_input_size_byte",
942 .kind = Kind::UInt64,
943 .scope = Scope::Online,
944 .minPublishInterval = quickUpdateInterval},
945 MetricSpec{.name = "total_sigusr1",
947 .kind = Kind::UInt64,
948 .minPublishInterval = quickUpdateInterval},
949 MetricSpec{.name = "consumed-timeframes",
951 .kind = Kind::UInt64,
952 .minPublishInterval = 0,
953 .maxRefreshLatency = quickRefreshInterval,
954 .sendInitialValue = true},
955 MetricSpec{.name = "min_input_latency_ms",
957 .kind = Kind::UInt64,
958 .scope = Scope::Online,
959 .minPublishInterval = quickUpdateInterval},
960 MetricSpec{.name = "max_input_latency_ms",
962 .kind = Kind::UInt64,
963 .minPublishInterval = quickUpdateInterval},
964 MetricSpec{.name = "total_rate_in_mb_s",
966 .kind = Kind::Rate,
967 .scope = Scope::Online,
968 .minPublishInterval = quickUpdateInterval,
969 .maxRefreshLatency = onlineRefreshLatency,
970 .sendInitialValue = true},
971 MetricSpec{.name = "total_rate_out_mb_s",
973 .kind = Kind::Rate,
974 .scope = Scope::Online,
975 .minPublishInterval = quickUpdateInterval,
976 .maxRefreshLatency = onlineRefreshLatency,
977 .sendInitialValue = true},
978 MetricSpec{.name = "processing_rate_hz",
980 .kind = Kind::Rate,
981 .scope = Scope::Online,
982 .minPublishInterval = quickUpdateInterval,
983 .maxRefreshLatency = onlineRefreshLatency,
984 .sendInitialValue = true},
985 MetricSpec{.name = "cpu_usage_fraction",
986 .enabled = enableCPUUsageFraction,
988 .kind = Kind::Rate,
989 .scope = Scope::Online,
990 .minPublishInterval = quickUpdateInterval,
991 .maxRefreshLatency = onlineRefreshLatency,
992 .sendInitialValue = true},
993 MetricSpec{.name = "performed_computations",
995 .kind = Kind::UInt64,
996 .scope = Scope::Online,
997 .minPublishInterval = quickUpdateInterval,
998 .maxRefreshLatency = onlineRefreshLatency,
999 .sendInitialValue = true},
1000 MetricSpec{.name = "total_bytes_in",
1002 .kind = Kind::UInt64,
1003 .scope = Scope::Online,
1004 .minPublishInterval = quickUpdateInterval,
1005 .maxRefreshLatency = onlineRefreshLatency,
1006 .sendInitialValue = true},
1007 MetricSpec{.name = "total_bytes_out",
1009 .kind = Kind::UInt64,
1010 .scope = Scope::Online,
1011 .minPublishInterval = quickUpdateInterval,
1012 .maxRefreshLatency = onlineRefreshLatency,
1013 .sendInitialValue = true},
1014 MetricSpec{.name = fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId),
1015 .metricId = (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512),
1016 .kind = Kind::UInt64,
1017 .scope = Scope::Online,
1018 .minPublishInterval = 500,
1019 .maxRefreshLatency = onlineRefreshLatency,
1020 .sendInitialValue = true},
1021 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1022 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1023 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1024 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1025 MetricSpec{.name = "arrow-bytes-destroyed",
1026 .enabled = arrowAndResourceLimitingMetrics,
1027 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED),
1028 .kind = Kind::UInt64,
1029 .scope = Scope::DPL,
1030 .minPublishInterval = 0,
1031 .maxRefreshLatency = 10000,
1032 .sendInitialValue = true},
1033 MetricSpec{.name = "arrow-messages-destroyed",
1034 .enabled = arrowAndResourceLimitingMetrics,
1035 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED),
1036 .kind = Kind::UInt64,
1037 .scope = Scope::DPL,
1038 .minPublishInterval = 0,
1039 .maxRefreshLatency = 10000,
1040 .sendInitialValue = true},
1041 MetricSpec{.name = "arrow-bytes-created",
1042 .enabled = arrowAndResourceLimitingMetrics,
1043 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED),
1044 .kind = Kind::UInt64,
1045 .scope = Scope::DPL,
1046 .minPublishInterval = 0,
1047 .maxRefreshLatency = 10000,
1048 .sendInitialValue = true},
1049 MetricSpec{.name = "arrow-messages-created",
1050 .enabled = arrowAndResourceLimitingMetrics,
1051 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED),
1052 .kind = Kind::UInt64,
1053 .scope = Scope::DPL,
1054 .minPublishInterval = 0,
1055 .maxRefreshLatency = 10000,
1056 .sendInitialValue = true},
1057 MetricSpec{.name = "arrow-bytes-expired",
1058 .enabled = arrowAndResourceLimitingMetrics,
1059 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED),
1060 .kind = Kind::UInt64,
1061 .scope = Scope::DPL,
1062 .minPublishInterval = 0,
1063 .maxRefreshLatency = 10000,
1064 .sendInitialValue = true},
1065 MetricSpec{.name = "shm-offer-bytes-consumed",
1066 .enabled = arrowAndResourceLimitingMetrics,
1067 .metricId = static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED),
1068 .kind = Kind::UInt64,
1069 .scope = Scope::DPL,
1070 .minPublishInterval = 0,
1071 .maxRefreshLatency = 10000,
1072 .sendInitialValue = true},
1073 MetricSpec{.name = "resources-missing",
1074 .enabled = enableDebugMetrics,
1075 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
1076 .kind = Kind::UInt64,
1077 .scope = Scope::DPL,
1078 .minPublishInterval = 1000,
1079 .maxRefreshLatency = 1000,
1080 .sendInitialValue = true},
1081 MetricSpec{.name = "resources-insufficient",
1082 .enabled = enableDebugMetrics,
1083 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
1084 .kind = Kind::UInt64,
1085 .scope = Scope::DPL,
1086 .minPublishInterval = 1000,
1087 .maxRefreshLatency = 1000,
1088 .sendInitialValue = true},
1089 MetricSpec{.name = "resources-satisfactory",
1090 .enabled = enableDebugMetrics,
1091 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
1092 .kind = Kind::UInt64,
1093 .scope = Scope::DPL,
1094 .minPublishInterval = 1000,
1095 .maxRefreshLatency = 1000,
1096 .sendInitialValue = true},
1097 MetricSpec{.name = "resource-offer-expired",
1098 .enabled = arrowAndResourceLimitingMetrics,
1099 .metricId = static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED),
1100 .kind = Kind::UInt64,
1101 .scope = Scope::DPL,
1102 .minPublishInterval = 0,
1103 .maxRefreshLatency = 10000,
1104 .sendInitialValue = true}};
1106 for (auto& metric : metrics) {
1107 if (metric.metricId == (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512) &&"readout-proxy") != 0) {
1108 continue;
1109 }
1110 stats->registerMetric(metric);
1111 }
1113 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1114 },
1115 .configure = noConfiguration(),
1116 .postProcessing = [](ProcessingContext& context, void* service) {
1117 auto* stats = (DataProcessingStats*)service;
1119 .preDangling = [](DanglingContext& context, void* service) {
1120 auto* stats = (DataProcessingStats*)service;
1121 sendRelayerMetrics(, *stats);
1122 flushMetrics(, *stats); },
1123 .postDangling = [](DanglingContext& context, void* service) {
1124 auto* stats = (DataProcessingStats*)service;
1125 sendRelayerMetrics(, *stats);
1126 flushMetrics(, *stats); },
1127 .preEOS = [](EndOfStreamContext& context, void* service) {
1128 auto* stats = (DataProcessingStats*)service;
1129 sendRelayerMetrics(, *stats);
1130 flushMetrics(, *stats); },
1131 .preLoop = [](ServiceRegistryRef ref, void* service) {
1132 auto* stats = (DataProcessingStats*)service;
1133 flushMetrics(ref, *stats); },
1134 .kind = ServiceKind::Serial};
1137// This is similar to the dataProcessingStats, but it designed to synchronize
1138// history-less metrics which are e.g. used for the GUI.
1141 return ServiceSpec{
1142 .name = "data-processing-states",
1143 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
1144 timespec now;
1145 clock_gettime(CLOCK_REALTIME, &now);
1146 uv_update_time(state.loop);
1147 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
1150 states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
1151 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
1152 },
1153 .configure = noConfiguration(),
1154 .postProcessing = [](ProcessingContext& context, void* service) {
1155 auto* states = (DataProcessingStates*)service;
1156 states->processCommandQueue(); },
1157 .preDangling = [](DanglingContext& context, void* service) {
1158 auto* states = (DataProcessingStates*)service;
1159 flushStates(, *states); },
1160 .postDangling = [](DanglingContext& context, void* service) {
1161 auto* states = (DataProcessingStates*)service;
1162 flushStates(, *states); },
1163 .preEOS = [](EndOfStreamContext& context, void* service) {
1164 auto* states = (DataProcessingStates*)service;
1165 flushStates(, *states); },
1166 .kind = ServiceKind::Global};
1174 return ServiceSpec{
1175 .name = "gui-metrics",
1176 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
1177 auto* stats = new GUIMetrics();
1178 auto& monitoring = services.get<Monitoring>();
1179 auto& spec = services.get<DeviceSpec const>();
1180 monitoring.send({(int)spec.inputChannels.size(), fmt::format("oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1181 monitoring.send({(int)1, fmt::format("oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1182 monitoring.send({(int)spec.outputChannels.size(), fmt::format("oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1183 monitoring.send({(int)1, fmt::format("oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1184 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1185 },
1186 .configure = noConfiguration(),
1187 .postProcessing = [](ProcessingContext& context, void* service) {
1188 auto& relayer =<DataRelayer>();
1189 auto& monitoring =<Monitoring>();
1190 auto& spec =<DeviceSpec const>();
1191 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1192 for (size_t ci; ci < spec.outputChannels.size(); ++ci) {
1193 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format("oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1194 } },
1195 .domainInfoUpdated = [](ServiceRegistryRef registry, size_t timeslice, ChannelIndex channel) {
1196 auto& monitoring = registry.get<Monitoring>();
1197 monitoring.send({(uint64_t)timeslice, fmt::format("oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1198 .active = false,
1199 .kind = ServiceKind::Serial};
1204 return ServiceSpec{
1205 .name = "object-cache",
1206 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1207 auto* cache = new ObjectCache();
1208 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1209 },
1210 .configure = noConfiguration(),
1216 return ServiceSpec{
1217 .name = "data-processing-context",
1218 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1219 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessorContext>(), new DataProcessorContext()};
1220 },
1221 .configure = noConfiguration(),
1222 .exit = [](ServiceRegistryRef, void* service) { auto* context = (DataProcessorContext*)service; delete context; },
1223 .kind = ServiceKind::Serial};
1228 return ServiceSpec{
1229 .name = "device-context",
1230 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1231 return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
1232 },
1233 .configure = noConfiguration(),
1234 .kind = ServiceKind::Serial};
1239 return ServiceSpec{
1240 .name = "data-allocator",
1241 .uniqueId = simpleServiceId<DataAllocator>(),
1242 .init = [](ServiceRegistryRef ref, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1243 return ServiceHandle{
1244 .hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1245 .instance = new DataAllocator(ref),
1246 .kind = ServiceKind::Stream,
1247 .name = "data-allocator",
1248 };
1249 },
1250 .configure = noConfiguration(),
1251 .kind = ServiceKind::Stream};
1255std::vector<ServiceSpec> CommonServices::defaultServices(std::string extraPlugins, int numThreads)
1257 std::vector<ServiceSpec> specs{
1261 asyncQueue(),
1268 controlSpec(),
1269 rootFileSpec(),
1270 parallelSpec(),
1271 callbacksSpec(),
1274 dataRelayer(),
1276 dataSender(),
1277 objectCache(),
1278 ccdbSupportSpec()};
1281 specs.push_back(ArrowSupport::arrowBackendSpec());
1282 }
1285 specs.push_back(decongestionSpec());
1287 std::string loadableServicesStr = extraPlugins;
1288 // Do not load InfoLogger by default if we are not at P2.
1290 if (loadableServicesStr.empty() == false) {
1291 loadableServicesStr += ",";
1292 }
1293 loadableServicesStr += "O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1294 }
1295 // Load plugins depending on the environment
1296 std::vector<LoadablePlugin> loadablePlugins = {};
1297 char* loadableServicesEnv = getenv("DPL_LOAD_SERVICES");
1298 // String to define the services to load is:
1299 //
1300 // library1:name1,library2:name2,...
1301 if (loadableServicesEnv) {
1302 if (loadableServicesStr.empty() == false) {
1303 loadableServicesStr += ",";
1304 }
1305 loadableServicesStr += loadableServicesEnv;
1306 }
1307 loadablePlugins = PluginManager::parsePluginSpecString(loadableServicesStr.c_str());
1308 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
1309 // I should make it optional depending wether the GUI is there or not...
1310 specs.push_back(CommonServices::guiMetricsSpec());
1311 if (numThreads) {
1312 specs.push_back(threadPool(numThreads));
1313 }
1314 return specs;
1317std::vector<ServiceSpec> CommonServices::arrowServices()
1319 return {
1322 };
1325} // namespace o2::framework
