Project
Loading...
Searching...
No Matches
CommonServices.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
25#include "Framework/Signpost.h"
32#include "InputRouteHelpers.h"
36#include "Framework/Tracing.h"
46#include "Framework/Signpost.h"
47
48#include "TextDriverClient.h"
49#include "WSDriverClient.h"
50#include "HTTPParser.h"
51#include "../src/DataProcessingStatus.h"
52#include "DecongestionService.h"
53#include "ArrowSupport.h"
55#include "Headers/STFHeader.h"
56#include "Headers/DataHeader.h"
57
58#include <Configuration/ConfigurationInterface.h>
59#include <Configuration/ConfigurationFactory.h>
60#include <Monitoring/MonitoringFactory.h>
61#include "Framework/Signpost.h"
62
63#include <fairmq/Device.h>
64#include <fairmq/shmem/Monitor.h>
65#include <fairmq/shmem/Common.h>
66#include <fairmq/ProgOptions.h>
67#include <uv.h>
68
69#include <cstdlib>
70#include <cstring>
71
72using o2::configuration::ConfigurationFactory;
73using o2::configuration::ConfigurationInterface;
74using o2::monitoring::Monitoring;
75using o2::monitoring::MonitoringFactory;
76using Metric = o2::monitoring::Metric;
77using Key = o2::monitoring::tags::Key;
78using Value = o2::monitoring::tags::Value;
79
80O2_DECLARE_DYNAMIC_LOG(data_processor_context);
81O2_DECLARE_DYNAMIC_LOG(stream_context);
84
85namespace o2::framework
86{
87
88#define MONITORING_QUEUE_SIZE 100
90{
91 return ServiceSpec{
92 .name = "monitoring",
93 .init = [](ServiceRegistryRef registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
94 void* service = nullptr;
95 bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
96 bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
97 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
98 o2::monitoring::Monitoring* monitoring;
99 if (useDPL) {
100 monitoring = new Monitoring();
101 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
102 (dynamic_cast<o2::monitoring::Backend*>(dplBackend.get()))->setVerbosity(o2::monitoring::Verbosity::Debug);
103 monitoring->addBackend(std::move(dplBackend));
104 } else {
105 auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
106 monitoring = MonitoringFactory::Get(backend).release();
107 }
108 service = monitoring;
109 monitoring->enableBuffering(MONITORING_QUEUE_SIZE);
110 assert(registry.get<DeviceSpec const>().name.empty() == false);
111 monitoring->addGlobalTag("pipeline_id", std::to_string(registry.get<DeviceSpec const>().inputTimesliceId));
112 monitoring->addGlobalTag("dataprocessor_name", registry.get<DeviceSpec const>().name);
113 monitoring->addGlobalTag("dpl_instance", options.GetPropertyAsString("shm-segment-id"));
114 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
115 },
116 .configure = noConfiguration(),
117 .start = [](ServiceRegistryRef services, void* service) {
118 auto* monitoring = (o2::monitoring::Monitoring*)service;
119
120 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
121 if (extRunNumber == "unspecified") {
122 return;
123 }
124 try {
125 monitoring->setRunNumber(std::stoul(extRunNumber));
126 } catch (...) {
127 } },
128 .exit = [](ServiceRegistryRef registry, void* service) {
129 auto* monitoring = reinterpret_cast<Monitoring*>(service);
130 monitoring->flushBuffer();
131 delete monitoring; },
132 .kind = ServiceKind::Serial};
133}
134
135// An asyncronous service that executes actions in at the end of the data processing
137{
138 return ServiceSpec{
139 .name = "async-queue",
140 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
141 .configure = noConfiguration(),
142 .stop = [](ServiceRegistryRef services, void* service) {
143 auto& queue = services.get<AsyncQueue>();
145 },
146 .kind = ServiceKind::Serial};
147}
148
149// Make it a service so that it can be used easily from the analysis
150// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
151// basis when we get to it.
153{
154 return ServiceSpec{
155 .name = "timing-info",
156 .uniqueId = simpleServiceId<TimingInfo>(),
157 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
158 .configure = noConfiguration(),
159 .kind = ServiceKind::Stream};
160}
161
163{
164 return ServiceSpec{
165 .name = "stream-context",
166 .uniqueId = simpleServiceId<StreamContext>(),
167 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
168 .configure = noConfiguration(),
169 .preProcessing = [](ProcessingContext& context, void* service) {
170 auto* stream = (StreamContext*)service;
171 auto& routes = context.services().get<DeviceSpec const>().outputs;
172 // Notice I need to do this here, because different invocation for
173 // the same stream might be referring to different data processors.
174 // We should probably have a context which is per stream of a specific
175 // data processor.
176 stream->routeDPLCreated.resize(routes.size());
177 stream->routeCreated.resize(routes.size());
178 // Reset the routeDPLCreated at every processing step
179 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false);
180 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
181 .postProcessing = [](ProcessingContext& processingContext, void* service) {
182 auto* stream = (StreamContext*)service;
183 auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
184 auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
185 auto& messageContext = processingContext.services().get<MessageContext>();
186 // Check if we never created any data for this timeslice
187 // if we did not, but we still have didDispatched set to true
188 // it means it was created out of band.
189 bool userDidCreate = false;
190 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
191 for (size_t ri = 0; ri < routes.size(); ++ri) {
192 if (stream->routeCreated[ri] == true && stream->routeDPLCreated[ri] == false) {
193 userDidCreate = true;
194 break;
195 }
196 }
197 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "userDidCreate == %d && didDispatch == %d",
198 userDidCreate,
199 messageContext.didDispatch());
200 if (userDidCreate == false && messageContext.didDispatch() == true) {
201 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
202 userDidCreate,
203 messageContext.didDispatch());
204 return;
205 }
206 if (userDidCreate == false && messageContext.didDispatch() == false) {
207 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
208 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "No data created.");
209 return;
210 }
211 for (size_t ri = 0; ri < routes.size(); ++ri) {
212 auto &route = routes[ri];
213 auto &matcher = route.matcher;
214 if (stream->routeDPLCreated[ri] == true) {
215 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created by DPL. ri = %" PRIu64 ", %{public}s",
216 (uint64_t)ri, DataSpecUtils::describe(matcher).c_str());
217 continue;
218 }
219 if (stream->routeCreated[ri] == true) {
220 continue;
221 } if ((timeslice % route.maxTimeslices) != route.timeslice) {
222 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Route ri = %" PRIu64 ", skipped because of pipelining.",
223 (uint64_t)ri);
224 continue;
225 }
226 if (matcher.lifetime == Lifetime::Timeframe) {
227 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks",
228 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64 " and might result in dropped timeframes",
229 DataSpecUtils::describe(matcher).c_str(), (uint64_t)timeslice);
230 LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
231 }
232 } },
233 .preEOS = [](EndOfStreamContext& context, void* service) {
234 // We need to reset the routeDPLCreated / routeCreated because the end of stream
235 // uses a different context which does not know about the routes.
236 // FIXME: This should be fixed in a different way, but for now it will
237 // allow TPC IDC to work.
238 auto* stream = (StreamContext*)service;
239 auto& routes = context.services().get<DeviceSpec const>().outputs;
240 // Notice I need to do this here, because different invocation for
241 // the same stream might be referring to different data processors.
242 // We should probably have a context which is per stream of a specific
243 // data processor.
244 stream->routeDPLCreated.resize(routes.size());
245 stream->routeCreated.resize(routes.size());
246 // Reset the routeCreated / routeDPLCreated at every processing step
247 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false);
248 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false); },
249 .kind = ServiceKind::Stream};
250}
251
253{
254 return ServiceSpec{
255 .name = "datataking-contex",
256 .uniqueId = simpleServiceId<DataTakingContext>(),
257 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
258 .configure = noConfiguration(),
259 .preProcessing = [](ProcessingContext& processingContext, void* service) {
260 auto& context = processingContext.services().get<DataTakingContext>();
261 for (auto const& ref : processingContext.inputs()) {
262 const o2::framework::DataProcessingHeader *dph = o2::header::get<DataProcessingHeader*>(ref.header);
263 const auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
264 if (!dph || !dh) {
265 continue;
266 }
267 context.runNumber = fmt::format("{}", dh->runNumber);
268 break;
269 } },
270 // Notice this will be executed only once, because the service is declared upfront.
271 .start = [](ServiceRegistryRef services, void* service) {
272 auto& context = services.get<DataTakingContext>();
273
275
276 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
277 if (extRunNumber != "unspecified" || context.runNumber == "0") {
278 context.runNumber = extRunNumber;
279 }
280 auto extLHCPeriod = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhc_period", "unspecified");
281 if (extLHCPeriod != "unspecified") {
282 context.lhcPeriod = extLHCPeriod;
283 } else {
284 static const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
285 time_t now = time(nullptr);
286 auto ltm = gmtime(&now);
287 context.lhcPeriod = months[ltm->tm_mon];
288 LOG(info) << "LHCPeriod is not available, using current month " << context.lhcPeriod;
289 }
290
291 auto extRunType = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("run_type", "unspecified");
292 if (extRunType != "unspecified") {
293 context.runType = extRunType;
294 }
295 auto extEnvId = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", "unspecified");
296 if (extEnvId != "unspecified") {
297 context.envId = extEnvId;
298 }
299 auto extDetectors = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("detectors", "unspecified");
300 if (extDetectors != "unspecified") {
301 context.detectors = extDetectors;
302 }
303 auto forcedRaw = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("force_run_as_raw", "false");
304 context.forcedRaw = forcedRaw == "true"; },
305 .kind = ServiceKind::Stream};
306}
307
309};
310
312{
313 return ServiceSpec{
314 .name = "configuration",
315 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
316 auto backend = options.GetPropertyAsString("configuration");
317 if (backend == "command-line") {
318 return ServiceHandle{0, nullptr};
319 }
320 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
321 ConfigurationFactory::getConfiguration(backend).release()};
322 },
323 .configure = noConfiguration(),
324 .driverStartup = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
325 if (dc.options.count("configuration") == 0) {
326 registry.registerService(ServiceHandle{0, nullptr});
327 return;
328 }
329 auto backend = dc.options["configuration"].as<std::string>();
330 registry.registerService(ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
331 ConfigurationFactory::getConfiguration(backend).release()}); },
332 .kind = ServiceKind::Global};
333}
334
336{
337 return ServiceSpec{
338 .name = "driverClient",
339 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
340 auto backend = options.GetPropertyAsString("driver-client-backend");
341 if (backend == "stdout://") {
342 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
343 new TextDriverClient(services, state)};
344 }
345 auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
346 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
347 new WSDriverClient(services, ip.c_str(), port)};
348 },
349 .configure = noConfiguration(),
350 .kind = ServiceKind::Global};
351}
352
354{
355 return ServiceSpec{
356 .name = "control",
357 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
358 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
359 new ControlService(services, state)};
360 },
361 .configure = noConfiguration(),
362 .kind = ServiceKind::Serial};
363}
364
366{
367 return ServiceSpec{
368 .name = "localrootfile",
369 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
370 .configure = noConfiguration(),
371 .kind = ServiceKind::Serial};
372}
373
375{
376 return ServiceSpec{
377 .name = "parallel",
378 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
379 auto& spec = services.get<DeviceSpec const>();
380 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
381 new ParallelContext(spec.rank, spec.nSlots)};
382 },
383 .configure = noConfiguration(),
384 .kind = ServiceKind::Serial};
385}
386
388{
389 return ServiceSpec{
390 .name = "timesliceindex",
391 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
392 auto& spec = services.get<DeviceSpec const>();
393 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
394 new TimesliceIndex(InputRouteHelpers::maxLanes(spec.inputs), state.inputChannelInfos)};
395 },
396 .configure = noConfiguration(),
397 .kind = ServiceKind::Serial};
398}
399
401{
402 return ServiceSpec{
403 .name = "callbacks",
404 .init = simpleServiceInit<CallbackService, CallbackService>(),
405 .configure = noConfiguration(),
406 .kind = ServiceKind::Serial};
407}
408
410{
411 return ServiceSpec{
412 .name = "datarelayer",
413 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
414 auto& spec = services.get<DeviceSpec const>();
415 return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
416 new DataRelayer(spec.completionPolicy,
417 spec.inputs,
418 services.get<TimesliceIndex>(),
419 services)};
420 },
421 .configure = noConfiguration(),
422 .kind = ServiceKind::Serial};
423}
424
426{
427 return ServiceSpec{
428 .name = "datasender",
429 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
430 return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
431 new DataSender(services)};
432 },
433 .configure = noConfiguration(),
434 .preProcessing = [](ProcessingContext&, void* service) {
435 auto& dataSender = *reinterpret_cast<DataSender*>(service);
436 dataSender.reset(); },
437 .postDispatching = [](ProcessingContext& ctx, void* service) {
438 auto& dataSender = *reinterpret_cast<DataSender*>(service);
439 // If the quit was requested, the post dispatching can still happen
440 // but with an empty set of data.
441 if (ctx.services().get<DeviceState>().quitRequested == false) {
442 dataSender.verifyMissingSporadic();
443 } },
445}
446
450
452{
453 return ServiceSpec{
454 .name = "tracing",
455 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
456 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<TracingInfrastructure>(),
457 .instance = new TracingInfrastructure(),
458 .kind = ServiceKind::Serial};
459 },
460 .configure = noConfiguration(),
461 .preProcessing = [](ProcessingContext&, void* service) {
462 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
463 t->processingCount += 1; },
464 .postProcessing = [](ProcessingContext&, void* service) {
465 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
466 t->processingCount += 1; },
467 .kind = ServiceKind::Serial};
468}
469
471};
472
473// CCDB Support service
475{
476 return ServiceSpec{
477 .name = "ccdb-support",
478 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
479 // iterate on all the outputs matchers
480 auto& spec = services.get<DeviceSpec const>();
481 for (auto& output : spec.outputs) {
482 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
483 LOGP(debug, "Optional inputs support enabled");
484 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = new CCDBSupport, .kind = ServiceKind::Serial};
485 }
486 }
487 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = nullptr, .kind = ServiceKind::Serial};
488 },
489 .configure = noConfiguration(),
490 .finaliseOutputs = [](ProcessingContext& pc, void* service) {
491 if (!service) {
492 return;
493 }
494 if (pc.outputs().countDeviceOutputs(true) == 0) {
495 LOGP(debug, "We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
496 return;
497 }
498 auto& timingInfo = pc.services().get<TimingInfo>();
499
500 // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
501 // we create a new message.
502 InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
503 auto& streamContext = pc.services().get<StreamContext>();
504 for (size_t oi = 0; oi < pc.services().get<DeviceSpec const>().outputs.size(); ++oi) {
505 OutputRoute const& output = pc.services().get<DeviceSpec const>().outputs[oi];
506 if ((output.timeslice % output.maxTimeslices) != 0) {
507 continue;
508 }
509 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
510 auto concrete = DataSpecUtils::asConcreteDataMatcher(output.matcher);
511 if (concrete.subSpec == 0) {
512 continue;
513 }
514 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(Output{concrete.origin, concrete.description, concrete.subSpec});
515 stfDist.id = timingInfo.timeslice;
516 stfDist.firstOrbit = timingInfo.firstTForbit;
517 stfDist.runNumber = timingInfo.runNumber;
518 // We mark it as not created, because we do should not account for it when
519 // checking if we created all the data for a timeslice.
520 O2_SIGNPOST_ID_FROM_POINTER(sid, stream_context, &streamContext);
521 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "finaliseOutputs", "Route %" PRIu64 " (%{public}s) was created by DPL.", (uint64_t)oi,
522 DataSpecUtils::describe(output.matcher).c_str());
523 streamContext.routeDPLCreated[oi] = true;
524 }
525 } },
526 .kind = ServiceKind::Global};
527}
532
533auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
534 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
535 auto& ref = task.user<DecongestionContext>().ref;
536
537 auto& decongestion = ref.get<DecongestionService>();
538 auto& proxy = ref.get<FairMQDeviceProxy>();
539
540 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
541 cid.value = id;
542 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
543 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Not sending already sent value: %" PRIu64 "> %" PRIu64,
544 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
545 return;
546 }
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Running oldest possible timeslice %" PRIu64 " propagation.",
548 (uint64_t)oldestPossibleOutput.timeslice.value);
549 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
550
551 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
552 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
553 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
554 // TODO: this we could cache in the proxy at the bind moment.
555 if (info.channelType != ChannelAccountingType::DPL) {
556 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
557 continue;
558 }
559 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value)) {
560 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
561 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
562 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
563 }
564 }
565 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
566};
567
568auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
569 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
570 auto& ref = task.user<DecongestionContext>().ref;
571
572 auto& decongestion = ref.get<DecongestionService>();
573 auto& state = ref.get<DeviceState>();
574 auto& timesliceIndex = ref.get<TimesliceIndex>();
575 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
576 int64_t oldNextTimeslice = decongestion.nextTimeslice;
577 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
578 if (oldNextTimeslice != decongestion.nextTimeslice) {
579 if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
580 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
581 } else {
582 O2_SIGNPOST_EVENT_EMIT_ERROR(async_queue, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
583 }
584 timesliceIndex.rescan();
585 }
586};
587
588// Decongestion service
589// If we do not have any Timeframe input, it means we must be creating timeslices
590// in order and that we should propagate the oldest possible timeslice at the end
591// of each processing step.
594{
595 return ServiceSpec{
596 .name = "decongestion",
597 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
598 auto* decongestion = new DecongestionService();
599 for (auto& input : services.get<DeviceSpec const>().inputs) {
600 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
601 LOGP(detail, "Found a real data input, we cannot update the oldest possible timeslice when sending messages");
602 decongestion->isFirstInTopology = false;
603 break;
604 }
605 }
606 auto& queue = services.get<AsyncQueue>();
607 decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
608 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
609 },
610 .postForwarding = [](ProcessingContext& ctx, void* service) {
611 auto* decongestion = reinterpret_cast<DecongestionService*>(service);
612 if (O2_BUILTIN_LIKELY(decongestion->isFirstInTopology == false)) {
613 return;
614 }
615 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, service);
616 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "postForwardingCallbacks", "We are the first one in the topology, we need to update the oldest possible timeslice");
617 auto& timesliceIndex = ctx.services().get<TimesliceIndex>();
618 auto& relayer = ctx.services().get<DataRelayer>();
619 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
620 auto& proxy = ctx.services().get<FairMQDeviceProxy>();
621 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
622 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
623 LOGP(detail, "Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
624 return;
625 }
626
627 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
628 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
629 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
630 (uint64_t)oldestPossibleOutput.timeslice.value);
631 return;
632 }
633 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
634 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
635 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
636 return;
637 }
638
639 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Broadcasting oldest posssible output %" PRIu64 " due to %{public}s (%" PRIu64 ")",
640 (uint64_t)oldestPossibleOutput.timeslice.value,
641 oldestPossibleOutput.slot.index == -1 ? "channel" : "slot",
642 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
643 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Ordered active %d", decongestion->orderedCompletionPolicyActive);
644 if (decongestion->orderedCompletionPolicyActive) {
645 auto oldNextTimeslice = decongestion->nextTimeslice;
646 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
647 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Next timeslice %" PRIi64, decongestion->nextTimeslice);
648 if (oldNextTimeslice != decongestion->nextTimeslice) {
649 auto& state = ctx.services().get<DeviceState>();
651 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
652 } else {
653 O2_SIGNPOST_EVENT_EMIT_ERROR(data_processor_context, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
654 }
655 timesliceIndex.rescan();
656 }
657 }
658 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ctx.services(), oldestPossibleOutput.timeslice.value);
659
660 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
661 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
662 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
663 // TODO: this we could cache in the proxy at the bind moment.
664 if (info.channelType != ChannelAccountingType::DPL) {
665 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
666 continue;
667 }
668 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ctx.services(), info, state, oldestPossibleOutput.timeslice.value)) {
669 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
670 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
671 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
672 }
673 }
674 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
675 .stop = [](ServiceRegistryRef services, void* service) {
676 auto* decongestion = (DecongestionService*)service;
677 services.get<TimesliceIndex>().reset();
678 decongestion->nextEnumerationTimeslice = 0;
679 decongestion->nextEnumerationTimesliceRewinded = false;
680 decongestion->lastTimeslice = 0;
681 decongestion->nextTimeslice = 0;
682 decongestion->oldestPossibleTimesliceTask = {0};
683 auto &state = services.get<DeviceState>();
684 for (auto &channel : state.inputChannelInfos) {
685 channel.oldestForChannel = {0};
686 } },
687 .domainInfoUpdated = [](ServiceRegistryRef services, size_t oldestPossibleTimeslice, ChannelIndex channel) {
688 auto& decongestion = services.get<DecongestionService>();
689 auto& relayer = services.get<DataRelayer>();
690 auto& timesliceIndex = services.get<TimesliceIndex>();
691 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, &decongestion);
692 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Received oldest possible timeframe %" PRIu64 " from channel %d",
693 (uint64_t)oldestPossibleTimeslice, channel.value);
694 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
695 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
696 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
697
698 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
699 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
700 return;
701 }
702 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
703 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
704 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
705 return;
706 }
707 auto& queue = services.get<AsyncQueue>();
708 const auto& state = services.get<DeviceState>();
711 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
712 (uint64_t)oldestPossibleOutput.timeslice.value);
714 queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
715 .id = decongestion.oldestPossibleTimesliceTask,
716 .debounce = -1, .callback = decongestionCallback}
717 .user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
718
719 if (decongestion.orderedCompletionPolicyActive) {
721 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
722 .callback = decongestionCallbackOrdered}
723 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
724 } },
725 .kind = ServiceKind::Serial};
726}
727
728// FIXME: allow configuring the default number of threads per device
729// This should probably be done by overriding the preFork
730// callback and using the boost program options there to
731// get the default number of threads.
733{
734 return ServiceSpec{
735 .name = "threadpool",
736 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
737 auto* pool = new ThreadPool();
738 // FIXME: this will require some extra argument for the configuration context of a service
739 pool->poolSize = 1;
740 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
741 },
742 .configure = [](InitContext&, void* service) -> void* {
743 auto* t = reinterpret_cast<ThreadPool*>(service);
744 // FIXME: this will require some extra argument for the configuration context of a service
745 t->poolSize = 1;
746 return service;
747 },
748 .postForkParent = [](ServiceRegistryRef services) -> void {
749 // FIXME: this will require some extra argument for the configuration context of a service
750 auto numWorkersS = std::to_string(1);
751 setenv("UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
752 },
753 .kind = ServiceKind::Serial};
754}
755
756namespace
757{
758auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
759{
760 // Update the timer to make sure we have the correct time when sending out the stats.
761 uv_update_time(registry.get<DeviceState>().loop);
762 // Derive the amount of shared memory used
763 auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
764 using namespace fair::mq::shmem;
765 auto& spec = registry.get<DeviceSpec const>();
766
767 auto hasMetric = [&runningWorkflow](const DataProcessingStats::MetricSpec& metric) -> bool {
768 return metric.metricId == static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512);
769 };
770 // FIXME: Ugly, but we do it only every 5 seconds...
771 if (std::find_if(stats.metricSpecs.begin(), stats.metricSpecs.end(), hasMetric) != stats.metricSpecs.end()) {
772 auto device = registry.get<RawDeviceService>().device();
773 long freeMemory = -1;
774 try {
775 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
776 } catch (...) {
777 }
778 if (freeMemory == -1) {
779 try {
780 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
781 } catch (...) {
782 }
783 }
784 stats.updateStats({static_cast<unsigned short>(static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory});
785 }
786
787 auto device = registry.get<RawDeviceService>().device();
788
789 int64_t totalBytesIn = 0;
790 int64_t totalBytesOut = 0;
791
792 for (auto& channel : device->GetChannels()) {
793 totalBytesIn += channel.second[0].GetBytesRx();
794 totalBytesOut += channel.second[0].GetBytesTx();
795 }
796
797 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_IN), DataProcessingStats::Op::Set, totalBytesIn / 1000000});
798 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_OUT), DataProcessingStats::Op::Set, totalBytesOut / 1000000});
799
800 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_IN_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesIn / 1000000});
801 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
802};
803
804auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
805{
806 states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
807 auto& client = registry.get<ControlService>();
808 client.push(spec, value, timestamp);
809 });
810}
811
812O2_DECLARE_DYNAMIC_LOG(monitoring_service);
813
815auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
816{
817 // Flushing metrics should only happen on main thread to avoid
818 // having to have a mutex for the communication with the driver.
819 O2_SIGNPOST_ID_GENERATE(sid, monitoring_service);
820 O2_SIGNPOST_START(monitoring_service, sid, "flush", "flushing metrics");
821 if (registry.isMainThread() == false) {
822 LOGP(fatal, "Flushing metrics should only happen on the main thread.");
823 }
824 auto& monitoring = registry.get<Monitoring>();
825 auto& relayer = registry.get<DataRelayer>();
826
827 // Send all the relevant metrics for the relayer to update the GUI
828 stats.flushChangedMetrics([&monitoring](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) mutable -> void {
829 // convert timestamp to a time_point
830 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
831 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
832 if (spec.kind == DataProcessingStats::Kind::UInt64) {
833 if (value < 0) {
834 LOG(debug) << "Value for " << spec.name << " is negative, setting to 0";
835 value = 0;
836 }
837 metric.addValue((uint64_t)value, "value");
838 } else {
839 if (value > (int64_t)std::numeric_limits<int>::max()) {
840 LOG(warning) << "Value for " << spec.name << " is too large, setting to INT_MAX";
841 value = (int64_t)std::numeric_limits<int>::max();
842 }
843 if (value < (int64_t)std::numeric_limits<int>::min()) {
844 value = (int64_t)std::numeric_limits<int>::min();
845 LOG(warning) << "Value for " << spec.name << " is too small, setting to INT_MIN";
846 }
847 metric.addValue((int)value, "value");
848 }
849 if (spec.scope == DataProcessingStats::Scope::DPL) {
850 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
851 }
852 monitoring.send(std::move(metric));
853 });
854 relayer.sendContextState();
855 monitoring.flushBuffer();
856 O2_SIGNPOST_END(monitoring_service, sid, "flush", "done flushing metrics");
857};
858} // namespace
859
861{
862 return ServiceSpec{
863 .name = "data-processing-stats",
864 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
865 timespec now;
866 clock_gettime(CLOCK_REALTIME, &now);
867 uv_update_time(state.loop);
868 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
870 .minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
873 config);
874 auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
875
876 // It makes no sense to update the stats more often than every 5s
877 int quickUpdateInterval = 5000;
878 uint64_t quickRefreshInterval = 7000;
879 uint64_t onlineRefreshLatency = 60000; // For metrics which are reported online, we flush them every 60s regardless of their state.
880 using MetricSpec = DataProcessingStats::MetricSpec;
881 using Kind = DataProcessingStats::Kind;
882 using Scope = DataProcessingStats::Scope;
883
884#ifdef NDEBUG
885 bool enableDebugMetrics = false;
886#else
887 bool enableDebugMetrics = true;
888#endif
889 bool arrowAndResourceLimitingMetrics = false;
891 arrowAndResourceLimitingMetrics = true;
892 }
893 // Input proxies should not report cpu_usage_fraction,
894 // because of the rate limiting which biases the measurement.
895 auto& spec = services.get<DeviceSpec const>();
896 bool enableCPUUsageFraction = true;
897 auto isProxy = [](DataProcessorLabel const& label) -> bool { return label == DataProcessorLabel{"input-proxy"}; };
898 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
899 O2_SIGNPOST_ID_GENERATE(mid, policies);
900 O2_SIGNPOST_EVENT_EMIT(policies, mid, "metrics", "Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
901 enableCPUUsageFraction = false;
902 }
903
904 std::vector<DataProcessingStats::MetricSpec> metrics = {
905 MetricSpec{.name = "errors",
907 .kind = Kind::UInt64,
908 .scope = Scope::Online,
909 .minPublishInterval = quickUpdateInterval,
910 .maxRefreshLatency = quickRefreshInterval},
911 MetricSpec{.name = "exceptions",
913 .kind = Kind::UInt64,
914 .scope = Scope::Online,
915 .minPublishInterval = quickUpdateInterval},
916 MetricSpec{.name = "inputs/relayed/pending",
918 .kind = Kind::UInt64,
919 .minPublishInterval = quickUpdateInterval},
920 MetricSpec{.name = "inputs/relayed/incomplete",
922 .kind = Kind::UInt64,
923 .minPublishInterval = quickUpdateInterval},
924 MetricSpec{.name = "inputs/relayed/total",
926 .kind = Kind::UInt64,
927 .minPublishInterval = quickUpdateInterval},
928 MetricSpec{.name = "elapsed_time_ms",
930 .kind = Kind::UInt64,
931 .minPublishInterval = quickUpdateInterval},
932 MetricSpec{.name = "total_wall_time_ms",
934 .kind = Kind::UInt64,
935 .minPublishInterval = quickUpdateInterval},
936 MetricSpec{.name = "last_processed_input_size_byte",
938 .kind = Kind::UInt64,
939 .minPublishInterval = quickUpdateInterval},
940 MetricSpec{.name = "total_processed_input_size_byte",
942 .kind = Kind::UInt64,
943 .scope = Scope::Online,
944 .minPublishInterval = quickUpdateInterval},
945 MetricSpec{.name = "total_sigusr1",
947 .kind = Kind::UInt64,
948 .minPublishInterval = quickUpdateInterval},
949 MetricSpec{.name = "consumed-timeframes",
951 .kind = Kind::UInt64,
952 .minPublishInterval = 0,
953 .maxRefreshLatency = quickRefreshInterval,
954 .sendInitialValue = true},
955 MetricSpec{.name = "min_input_latency_ms",
957 .kind = Kind::UInt64,
958 .scope = Scope::Online,
959 .minPublishInterval = quickUpdateInterval},
960 MetricSpec{.name = "max_input_latency_ms",
962 .kind = Kind::UInt64,
963 .minPublishInterval = quickUpdateInterval},
964 MetricSpec{.name = "total_rate_in_mb_s",
966 .kind = Kind::Rate,
967 .scope = Scope::Online,
968 .minPublishInterval = quickUpdateInterval,
969 .maxRefreshLatency = onlineRefreshLatency,
970 .sendInitialValue = true},
971 MetricSpec{.name = "total_rate_out_mb_s",
973 .kind = Kind::Rate,
974 .scope = Scope::Online,
975 .minPublishInterval = quickUpdateInterval,
976 .maxRefreshLatency = onlineRefreshLatency,
977 .sendInitialValue = true},
978 MetricSpec{.name = "processing_rate_hz",
980 .kind = Kind::Rate,
981 .scope = Scope::Online,
982 .minPublishInterval = quickUpdateInterval,
983 .maxRefreshLatency = onlineRefreshLatency,
984 .sendInitialValue = true},
985 MetricSpec{.name = "cpu_usage_fraction",
986 .enabled = enableCPUUsageFraction,
988 .kind = Kind::Rate,
989 .scope = Scope::Online,
990 .minPublishInterval = quickUpdateInterval,
991 .maxRefreshLatency = onlineRefreshLatency,
992 .sendInitialValue = true},
993 MetricSpec{.name = "performed_computations",
995 .kind = Kind::UInt64,
996 .scope = Scope::Online,
997 .minPublishInterval = quickUpdateInterval,
998 .maxRefreshLatency = onlineRefreshLatency,
999 .sendInitialValue = true},
1000 MetricSpec{.name = "total_bytes_in",
1002 .kind = Kind::UInt64,
1003 .scope = Scope::Online,
1004 .minPublishInterval = quickUpdateInterval,
1005 .maxRefreshLatency = onlineRefreshLatency,
1006 .sendInitialValue = true},
1007 MetricSpec{.name = "total_bytes_out",
1009 .kind = Kind::UInt64,
1010 .scope = Scope::Online,
1011 .minPublishInterval = quickUpdateInterval,
1012 .maxRefreshLatency = onlineRefreshLatency,
1013 .sendInitialValue = true},
1014 MetricSpec{.name = fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId),
1015 .metricId = (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512),
1016 .kind = Kind::UInt64,
1017 .scope = Scope::Online,
1018 .minPublishInterval = 500,
1019 .maxRefreshLatency = onlineRefreshLatency,
1020 .sendInitialValue = true},
1021 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1022 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1023 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1024 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1025 MetricSpec{.name = "arrow-bytes-destroyed",
1026 .enabled = arrowAndResourceLimitingMetrics,
1027 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED),
1028 .kind = Kind::UInt64,
1029 .scope = Scope::DPL,
1030 .minPublishInterval = 0,
1031 .maxRefreshLatency = 10000,
1032 .sendInitialValue = true},
1033 MetricSpec{.name = "arrow-messages-destroyed",
1034 .enabled = arrowAndResourceLimitingMetrics,
1035 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED),
1036 .kind = Kind::UInt64,
1037 .scope = Scope::DPL,
1038 .minPublishInterval = 0,
1039 .maxRefreshLatency = 10000,
1040 .sendInitialValue = true},
1041 MetricSpec{.name = "arrow-bytes-created",
1042 .enabled = arrowAndResourceLimitingMetrics,
1043 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED),
1044 .kind = Kind::UInt64,
1045 .scope = Scope::DPL,
1046 .minPublishInterval = 0,
1047 .maxRefreshLatency = 10000,
1048 .sendInitialValue = true},
1049 MetricSpec{.name = "arrow-messages-created",
1050 .enabled = arrowAndResourceLimitingMetrics,
1051 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED),
1052 .kind = Kind::UInt64,
1053 .scope = Scope::DPL,
1054 .minPublishInterval = 0,
1055 .maxRefreshLatency = 10000,
1056 .sendInitialValue = true},
1057 MetricSpec{.name = "arrow-bytes-expired",
1058 .enabled = arrowAndResourceLimitingMetrics,
1059 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED),
1060 .kind = Kind::UInt64,
1061 .scope = Scope::DPL,
1062 .minPublishInterval = 0,
1063 .maxRefreshLatency = 10000,
1064 .sendInitialValue = true},
1065 MetricSpec{.name = "shm-offer-bytes-consumed",
1066 .enabled = arrowAndResourceLimitingMetrics,
1067 .metricId = static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED),
1068 .kind = Kind::UInt64,
1069 .scope = Scope::DPL,
1070 .minPublishInterval = 0,
1071 .maxRefreshLatency = 10000,
1072 .sendInitialValue = true},
1073 MetricSpec{.name = "resources-missing",
1074 .enabled = enableDebugMetrics,
1075 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
1076 .kind = Kind::UInt64,
1077 .scope = Scope::DPL,
1078 .minPublishInterval = 1000,
1079 .maxRefreshLatency = 1000,
1080 .sendInitialValue = true},
1081 MetricSpec{.name = "resources-insufficient",
1082 .enabled = enableDebugMetrics,
1083 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
1084 .kind = Kind::UInt64,
1085 .scope = Scope::DPL,
1086 .minPublishInterval = 1000,
1087 .maxRefreshLatency = 1000,
1088 .sendInitialValue = true},
1089 MetricSpec{.name = "resources-satisfactory",
1090 .enabled = enableDebugMetrics,
1091 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
1092 .kind = Kind::UInt64,
1093 .scope = Scope::DPL,
1094 .minPublishInterval = 1000,
1095 .maxRefreshLatency = 1000,
1096 .sendInitialValue = true},
1097 MetricSpec{.name = "resource-offer-expired",
1098 .enabled = arrowAndResourceLimitingMetrics,
1099 .metricId = static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED),
1100 .kind = Kind::UInt64,
1101 .scope = Scope::DPL,
1102 .minPublishInterval = 0,
1103 .maxRefreshLatency = 10000,
1104 .sendInitialValue = true}};
1105
1106 for (auto& metric : metrics) {
1107 if (metric.metricId == (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512) && spec.name.compare("readout-proxy") != 0) {
1108 continue;
1109 }
1110 stats->registerMetric(metric);
1111 }
1112
1113 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1114 },
1115 .configure = noConfiguration(),
1116 .postProcessing = [](ProcessingContext& context, void* service) {
1117 auto* stats = (DataProcessingStats*)service;
1119 .preDangling = [](DanglingContext& context, void* service) {
1120 auto* stats = (DataProcessingStats*)service;
1121 sendRelayerMetrics(context.services(), *stats);
1122 flushMetrics(context.services(), *stats); },
1123 .postDangling = [](DanglingContext& context, void* service) {
1124 auto* stats = (DataProcessingStats*)service;
1125 sendRelayerMetrics(context.services(), *stats);
1126 flushMetrics(context.services(), *stats); },
1127 .preEOS = [](EndOfStreamContext& context, void* service) {
1128 auto* stats = (DataProcessingStats*)service;
1129 sendRelayerMetrics(context.services(), *stats);
1130 flushMetrics(context.services(), *stats); },
1131 .preLoop = [](ServiceRegistryRef ref, void* service) {
1132 auto* stats = (DataProcessingStats*)service;
1133 flushMetrics(ref, *stats); },
1134 .kind = ServiceKind::Serial};
1135}
1136
1137// This is similar to the dataProcessingStats, but it designed to synchronize
1138// history-less metrics which are e.g. used for the GUI.
1140{
1141 return ServiceSpec{
1142 .name = "data-processing-states",
1143 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
1144 timespec now;
1145 clock_gettime(CLOCK_REALTIME, &now);
1146 uv_update_time(state.loop);
1147 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
1150 states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
1151 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
1152 },
1153 .configure = noConfiguration(),
1154 .postProcessing = [](ProcessingContext& context, void* service) {
1155 auto* states = (DataProcessingStates*)service;
1156 states->processCommandQueue(); },
1157 .preDangling = [](DanglingContext& context, void* service) {
1158 auto* states = (DataProcessingStates*)service;
1159 flushStates(context.services(), *states); },
1160 .postDangling = [](DanglingContext& context, void* service) {
1161 auto* states = (DataProcessingStates*)service;
1162 flushStates(context.services(), *states); },
1163 .preEOS = [](EndOfStreamContext& context, void* service) {
1164 auto* states = (DataProcessingStates*)service;
1165 flushStates(context.services(), *states); },
1166 .kind = ServiceKind::Global};
1167}
1168
1170};
1171
1173{
1174 return ServiceSpec{
1175 .name = "gui-metrics",
1176 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
1177 auto* stats = new GUIMetrics();
1178 auto& monitoring = services.get<Monitoring>();
1179 auto& spec = services.get<DeviceSpec const>();
1180 monitoring.send({(int)spec.inputChannels.size(), fmt::format("oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1181 monitoring.send({(int)1, fmt::format("oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1182 monitoring.send({(int)spec.outputChannels.size(), fmt::format("oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1183 monitoring.send({(int)1, fmt::format("oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1184 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1185 },
1186 .configure = noConfiguration(),
1187 .postProcessing = [](ProcessingContext& context, void* service) {
1188 auto& relayer = context.services().get<DataRelayer>();
1189 auto& monitoring = context.services().get<Monitoring>();
1190 auto& spec = context.services().get<DeviceSpec const>();
1191 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1192 for (size_t ci; ci < spec.outputChannels.size(); ++ci) {
1193 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format("oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1194 } },
1195 .domainInfoUpdated = [](ServiceRegistryRef registry, size_t timeslice, ChannelIndex channel) {
1196 auto& monitoring = registry.get<Monitoring>();
1197 monitoring.send({(uint64_t)timeslice, fmt::format("oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1198 .active = false,
1199 .kind = ServiceKind::Serial};
1200}
1201
1203{
1204 return ServiceSpec{
1205 .name = "object-cache",
1206 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1207 auto* cache = new ObjectCache();
1208 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1209 },
1210 .configure = noConfiguration(),
1212}
1213
1215{
1216 return ServiceSpec{
1217 .name = "data-processing-context",
1218 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1219 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessorContext>(), new DataProcessorContext()};
1220 },
1221 .configure = noConfiguration(),
1222 .exit = [](ServiceRegistryRef, void* service) { auto* context = (DataProcessorContext*)service; delete context; },
1223 .kind = ServiceKind::Serial};
1224}
1225
1227{
1228 return ServiceSpec{
1229 .name = "device-context",
1230 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1231 return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
1232 },
1233 .configure = noConfiguration(),
1234 .kind = ServiceKind::Serial};
1235}
1236
1238{
1239 return ServiceSpec{
1240 .name = "data-allocator",
1241 .uniqueId = simpleServiceId<DataAllocator>(),
1242 .init = [](ServiceRegistryRef ref, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1243 return ServiceHandle{
1244 .hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1245 .instance = new DataAllocator(ref),
1246 .kind = ServiceKind::Stream,
1247 .name = "data-allocator",
1248 };
1249 },
1250 .configure = noConfiguration(),
1251 .kind = ServiceKind::Stream};
1252}
1253
1255std::vector<ServiceSpec> CommonServices::defaultServices(std::string extraPlugins, int numThreads)
1256{
1257 std::vector<ServiceSpec> specs{
1261 asyncQueue(),
1268 controlSpec(),
1269 rootFileSpec(),
1270 parallelSpec(),
1271 callbacksSpec(),
1274 dataRelayer(),
1276 dataSender(),
1277 objectCache(),
1278 ccdbSupportSpec()};
1279
1281 specs.push_back(ArrowSupport::arrowBackendSpec());
1282 }
1285 specs.push_back(decongestionSpec());
1286
1287 std::string loadableServicesStr = extraPlugins;
1288 // Do not load InfoLogger by default if we are not at P2.
1290 if (loadableServicesStr.empty() == false) {
1291 loadableServicesStr += ",";
1292 }
1293 loadableServicesStr += "O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1294 }
1295 // Load plugins depending on the environment
1296 std::vector<LoadablePlugin> loadablePlugins = {};
1297 char* loadableServicesEnv = getenv("DPL_LOAD_SERVICES");
1298 // String to define the services to load is:
1299 //
1300 // library1:name1,library2:name2,...
1301 if (loadableServicesEnv) {
1302 if (loadableServicesStr.empty() == false) {
1303 loadableServicesStr += ",";
1304 }
1305 loadableServicesStr += loadableServicesEnv;
1306 }
1307 loadablePlugins = PluginManager::parsePluginSpecString(loadableServicesStr.c_str());
1308 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
1309 // I should make it optional depending wether the GUI is there or not...
1310 specs.push_back(CommonServices::guiMetricsSpec());
1311 if (numThreads) {
1312 specs.push_back(threadPool(numThreads));
1313 }
1314 return specs;
1315}
1316
1317std::vector<ServiceSpec> CommonServices::arrowServices()
1318{
1319 return {
1322 };
1323}
1324
1325} // namespace o2::framework
benchmark::State & state
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
int16_t time
Definition RawEventData.h:4
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:515
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
Definition DataSender.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint id
Definition glcorearb.h:650
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ DPL
The channel is a normal input channel.
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
@ NoTransition
No pending transitions.
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
Definition AsyncQueue.h:32
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec controlSpec()
static ServiceSpec deviceContextSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
Running state information of a given device.
Definition DeviceState.h:34
static size_t maxLanes(std::vector< InputRoute > const &routes)
header::DataOrigin origin
Definition Output.h:28
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"