Project
Loading...
Searching...
No Matches
CommonServices.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
25#include "Framework/Signpost.h"
32#include "InputRouteHelpers.h"
36#include "Framework/Tracing.h"
46#include "Framework/Signpost.h"
49
50#include "TextDriverClient.h"
51#include "WSDriverClient.h"
52#include "HTTPParser.h"
53#include "../src/DataProcessingStatus.h"
54#include "DecongestionService.h"
55#include "ArrowSupport.h"
57#include "Headers/STFHeader.h"
58#include "Headers/DataHeader.h"
59
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/MonitoringFactory.h>
63#include "Framework/Signpost.h"
64
65#include <fairmq/Device.h>
66#include <fairmq/shmem/Monitor.h>
67#include <fairmq/shmem/Common.h>
68#include <fairmq/ProgOptions.h>
69#include <uv.h>
70
71#include <cstdlib>
72#include <cstring>
73
74using o2::configuration::ConfigurationFactory;
75using o2::configuration::ConfigurationInterface;
76using o2::monitoring::Monitoring;
77using o2::monitoring::MonitoringFactory;
78using Metric = o2::monitoring::Metric;
79using Key = o2::monitoring::tags::Key;
80using Value = o2::monitoring::tags::Value;
81
82O2_DECLARE_DYNAMIC_LOG(data_processor_context);
83O2_DECLARE_DYNAMIC_LOG(stream_context);
86
87namespace o2::framework
88{
89
90#define MONITORING_QUEUE_SIZE 100
92{
93 return ServiceSpec{
94 .name = "monitoring",
95 .init = [](ServiceRegistryRef registry, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
96 void* service = nullptr;
97 bool isWebsocket = strncmp(options.GetPropertyAsString("driver-client-backend").c_str(), "ws://", 4) == 0;
98 bool isDefault = options.GetPropertyAsString("monitoring-backend") == "default";
99 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString("monitoring-backend") == "dpl://";
100 o2::monitoring::Monitoring* monitoring;
101 if (useDPL) {
102 monitoring = new Monitoring();
103 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
104 (dynamic_cast<o2::monitoring::Backend*>(dplBackend.get()))->setVerbosity(o2::monitoring::Verbosity::Debug);
105 monitoring->addBackend(std::move(dplBackend));
106 } else {
107 auto backend = isDefault ? "infologger://" : options.GetPropertyAsString("monitoring-backend");
108 monitoring = MonitoringFactory::Get(backend).release();
109 }
110 service = monitoring;
111 monitoring->enableBuffering(MONITORING_QUEUE_SIZE);
112 assert(registry.get<DeviceSpec const>().name.empty() == false);
113 monitoring->addGlobalTag("pipeline_id", std::to_string(registry.get<DeviceSpec const>().inputTimesliceId));
114 monitoring->addGlobalTag("dataprocessor_name", registry.get<DeviceSpec const>().name);
115 monitoring->addGlobalTag("dpl_instance", options.GetPropertyAsString("shm-segment-id"));
116 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
117 },
118 .configure = noConfiguration(),
119 .start = [](ServiceRegistryRef services, void* service) {
120 auto* monitoring = (o2::monitoring::Monitoring*)service;
121
122 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
123 if (extRunNumber == "unspecified") {
124 return;
125 }
126 try {
127 monitoring->setRunNumber(std::stoul(extRunNumber));
128 } catch (...) {
129 } },
130 .exit = [](ServiceRegistryRef registry, void* service) {
131 auto* monitoring = reinterpret_cast<Monitoring*>(service);
132 monitoring->flushBuffer();
133 delete monitoring; },
134 .kind = ServiceKind::Serial};
135}
136
137// An asyncronous service that executes actions in at the end of the data processing
139{
140 return ServiceSpec{
141 .name = "async-queue",
142 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
143 .configure = noConfiguration(),
144 .stop = [](ServiceRegistryRef services, void* service) {
145 auto& queue = services.get<AsyncQueue>();
147 },
148 .kind = ServiceKind::Serial};
149}
150
151// Make it a service so that it can be used easily from the analysis
152// FIXME: Moreover, it makes sense that this will be duplicated on a per thread
153// basis when we get to it.
155{
156 return ServiceSpec{
157 .name = "timing-info",
158 .uniqueId = simpleServiceId<TimingInfo>(),
159 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
160 .configure = noConfiguration(),
161 .kind = ServiceKind::Stream};
162}
163
165{
166 return ServiceSpec{
167 .name = "stream-context",
168 .uniqueId = simpleServiceId<StreamContext>(),
169 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
170 .configure = noConfiguration(),
171 .preProcessing = [](ProcessingContext& context, void* service) {
172 auto* stream = (StreamContext*)service;
173 auto& routes = context.services().get<DeviceSpec const>().outputs;
174 // Notice I need to do this here, because different invocation for
175 // the same stream might be referring to different data processors.
176 // We should probably have a context which is per stream of a specific
177 // data processor.
178 stream->routeDPLCreated.resize(routes.size());
179 stream->routeCreated.resize(routes.size());
180 // Reset the routeDPLCreated at every processing step
181 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false);
182 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
183 .postProcessing = [](ProcessingContext& processingContext, void* service) {
184 auto* stream = (StreamContext*)service;
185 auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
186 auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
187 auto& messageContext = processingContext.services().get<MessageContext>();
188 // Check if we never created any data for this timeslice
189 // if we did not, but we still have didDispatched set to true
190 // it means it was created out of band.
191 bool userDidCreate = false;
192 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
193 for (size_t ri = 0; ri < routes.size(); ++ri) {
194 if (stream->routeCreated[ri] == true && stream->routeDPLCreated[ri] == false) {
195 userDidCreate = true;
196 break;
197 }
198 }
199 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "userDidCreate == %d && didDispatch == %d",
200 userDidCreate,
201 messageContext.didDispatch());
202 if (userDidCreate == false && messageContext.didDispatch() == true) {
203 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
204 userDidCreate,
205 messageContext.didDispatch());
206 return;
207 }
208 if (userDidCreate == false && messageContext.didDispatch() == false) {
209 O2_SIGNPOST_ID_FROM_POINTER(cid, stream_context, service);
210 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "No data created.");
211 return;
212 }
213 for (size_t ri = 0; ri < routes.size(); ++ri) {
214 auto &route = routes[ri];
215 auto &matcher = route.matcher;
216 if (stream->routeDPLCreated[ri] == true) {
217 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Data created by DPL. ri = %" PRIu64 ", %{public}s",
218 (uint64_t)ri, DataSpecUtils::describe(matcher).c_str());
219 continue;
220 }
221 if (stream->routeCreated[ri] == true) {
222 continue;
223 } if ((timeslice % route.maxTimeslices) != route.timeslice) {
224 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks", "Route ri = %" PRIu64 ", skipped because of pipelining.",
225 (uint64_t)ri);
226 continue;
227 }
228 if (matcher.lifetime == Lifetime::Timeframe) {
229 O2_SIGNPOST_EVENT_EMIT(stream_context, cid, "postProcessingCallbacks",
230 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64 " and might result in dropped timeframes",
231 DataSpecUtils::describe(matcher).c_str(), (uint64_t)timeslice);
232 LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
233 }
234 } },
235 .preEOS = [](EndOfStreamContext& context, void* service) {
236 // We need to reset the routeDPLCreated / routeCreated because the end of stream
237 // uses a different context which does not know about the routes.
238 // FIXME: This should be fixed in a different way, but for now it will
239 // allow TPC IDC to work.
240 auto* stream = (StreamContext*)service;
241 auto& routes = context.services().get<DeviceSpec const>().outputs;
242 // Notice I need to do this here, because different invocation for
243 // the same stream might be referring to different data processors.
244 // We should probably have a context which is per stream of a specific
245 // data processor.
246 stream->routeDPLCreated.resize(routes.size());
247 stream->routeCreated.resize(routes.size());
248 // Reset the routeCreated / routeDPLCreated at every processing step
249 std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false);
250 std::fill(stream->routeDPLCreated.begin(), stream->routeDPLCreated.end(), false); },
251 .kind = ServiceKind::Stream};
252}
253
255{
256 return ServiceSpec{
257 .name = "datataking-contex",
258 .uniqueId = simpleServiceId<DataTakingContext>(),
259 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
260 .configure = noConfiguration(),
261 .preProcessing = [](ProcessingContext& processingContext, void* service) {
262 auto& context = processingContext.services().get<DataTakingContext>();
263 for (auto const& ref : processingContext.inputs()) {
264 const o2::framework::DataProcessingHeader *dph = o2::header::get<DataProcessingHeader*>(ref.header);
265 const auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
266 if (!dph || !dh) {
267 continue;
268 }
269 context.runNumber = fmt::format("{}", dh->runNumber);
270 break;
271 } },
272 // Notice this will be executed only once, because the service is declared upfront.
273 .start = [](ServiceRegistryRef services, void* service) {
274 auto& context = services.get<DataTakingContext>();
275
277
278 auto extRunNumber = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", "unspecified");
279 if (extRunNumber != "unspecified" || context.runNumber == "0") {
280 context.runNumber = extRunNumber;
281 }
282 auto extLHCPeriod = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhc_period", "unspecified");
283 if (extLHCPeriod != "unspecified") {
284 context.lhcPeriod = extLHCPeriod;
285 } else {
286 static const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
287 time_t now = time(nullptr);
288 auto ltm = gmtime(&now);
289 context.lhcPeriod = months[ltm->tm_mon];
290 LOG(info) << "LHCPeriod is not available, using current month " << context.lhcPeriod;
291 }
292
293 auto extRunType = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("run_type", "unspecified");
294 if (extRunType != "unspecified") {
295 context.runType = extRunType;
296 }
297 auto extEnvId = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", "unspecified");
298 if (extEnvId != "unspecified") {
299 context.envId = extEnvId;
300 }
301 auto extDetectors = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("detectors", "unspecified");
302 if (extDetectors != "unspecified") {
303 context.detectors = extDetectors;
304 }
305 auto forcedRaw = services.get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("force_run_as_raw", "false");
306 context.forcedRaw = forcedRaw == "true"; },
307 .kind = ServiceKind::Stream};
308}
309
311};
312
314{
315 return ServiceSpec{
316 .name = "configuration",
317 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
318 auto backend = options.GetPropertyAsString("configuration");
319 if (backend == "command-line") {
320 return ServiceHandle{0, nullptr};
321 }
322 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
323 ConfigurationFactory::getConfiguration(backend).release()};
324 },
325 .configure = noConfiguration(),
326 .driverStartup = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
327 if (dc.options.count("configuration") == 0) {
328 registry.registerService(ServiceHandle{0, nullptr});
329 return;
330 }
331 auto backend = dc.options["configuration"].as<std::string>();
332 registry.registerService(ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
333 ConfigurationFactory::getConfiguration(backend).release()}); },
334 .kind = ServiceKind::Global};
335}
336
338{
339 return ServiceSpec{
340 .name = "driverClient",
341 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
342 auto backend = options.GetPropertyAsString("driver-client-backend");
343 if (backend == "stdout://") {
344 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
345 new TextDriverClient(services, state)};
346 }
347 auto [ip, port] = o2::framework::parse_websocket_url(backend.c_str());
348 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
349 new WSDriverClient(services, ip.c_str(), port)};
350 },
351 .configure = noConfiguration(),
352 .kind = ServiceKind::Global};
353}
354
356{
357 return ServiceSpec{
358 .name = "control",
359 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
360 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
361 new ControlService(services, state)};
362 },
363 .configure = noConfiguration(),
364 .kind = ServiceKind::Serial};
365}
366
368{
369 return ServiceSpec{
370 .name = "localrootfile",
371 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
372 .configure = noConfiguration(),
373 .kind = ServiceKind::Serial};
374}
375
377{
378 return ServiceSpec{
379 .name = "parallel",
380 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
381 auto& spec = services.get<DeviceSpec const>();
382 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
383 new ParallelContext(spec.rank, spec.nSlots)};
384 },
385 .configure = noConfiguration(),
386 .kind = ServiceKind::Serial};
387}
388
390{
391 return ServiceSpec{
392 .name = "timesliceindex",
393 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
394 auto& spec = services.get<DeviceSpec const>();
395 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
396 new TimesliceIndex(InputRouteHelpers::maxLanes(spec.inputs), state.inputChannelInfos)};
397 },
398 .configure = noConfiguration(),
399 .kind = ServiceKind::Serial};
400}
401
403{
404 return ServiceSpec{
405 .name = "callbacks",
406 .init = simpleServiceInit<CallbackService, CallbackService>(),
407 .configure = noConfiguration(),
408 .kind = ServiceKind::Serial};
409}
410
412{
413 return ServiceSpec{
414 .name = "datarelayer",
415 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
416 auto& spec = services.get<DeviceSpec const>();
417 int pipelineLength = DefaultsHelpers::pipelineLength(options);
418 return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
419 new DataRelayer(spec.completionPolicy,
420 spec.inputs,
421 services.get<TimesliceIndex>(),
422 services,
423 pipelineLength)};
424 },
425 .configure = noConfiguration(),
426 .kind = ServiceKind::Serial};
427}
428
430{
431 return ServiceSpec{
432 .name = "datasender",
433 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
434 return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
435 new DataSender(services)};
436 },
437 .configure = noConfiguration(),
438 .preProcessing = [](ProcessingContext&, void* service) {
439 auto& dataSender = *reinterpret_cast<DataSender*>(service);
440 dataSender.reset(); },
441 .postDispatching = [](ProcessingContext& ctx, void* service) {
442 auto& dataSender = *reinterpret_cast<DataSender*>(service);
443 // If the quit was requested, the post dispatching can still happen
444 // but with an empty set of data.
445 if (ctx.services().get<DeviceState>().quitRequested == false) {
446 dataSender.verifyMissingSporadic();
447 } },
449}
450
454
456{
457 return ServiceSpec{
458 .name = "tracing",
459 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
460 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<TracingInfrastructure>(),
461 .instance = new TracingInfrastructure(),
462 .kind = ServiceKind::Serial};
463 },
464 .configure = noConfiguration(),
465 .preProcessing = [](ProcessingContext&, void* service) {
466 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
467 t->processingCount += 1; },
468 .postProcessing = [](ProcessingContext&, void* service) {
469 auto* t = reinterpret_cast<TracingInfrastructure*>(service);
470 t->processingCount += 1; },
471 .kind = ServiceKind::Serial};
472}
473
475};
476
477// CCDB Support service
479{
480 return ServiceSpec{
481 .name = "ccdb-support",
482 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
483 // iterate on all the outputs matchers
484 auto& spec = services.get<DeviceSpec const>();
485 for (auto& output : spec.outputs) {
486 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
487 LOGP(debug, "Optional inputs support enabled");
488 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = new CCDBSupport, .kind = ServiceKind::Serial};
489 }
490 }
491 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<CCDBSupport>(), .instance = nullptr, .kind = ServiceKind::Serial};
492 },
493 .configure = noConfiguration(),
494 .finaliseOutputs = [](ProcessingContext& pc, void* service) {
495 if (!service) {
496 return;
497 }
498 if (pc.outputs().countDeviceOutputs(true) == 0) {
499 LOGP(debug, "We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
500 return;
501 }
502 auto& timingInfo = pc.services().get<TimingInfo>();
503
504 // For any output that is a FLP/DISTSUBTIMEFRAME with subspec != 0,
505 // we create a new message.
506 InputSpec matcher{"matcher", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}};
507 auto& streamContext = pc.services().get<StreamContext>();
508 for (size_t oi = 0; oi < pc.services().get<DeviceSpec const>().outputs.size(); ++oi) {
509 OutputRoute const& output = pc.services().get<DeviceSpec const>().outputs[oi];
510 if ((output.timeslice % output.maxTimeslices) != 0) {
511 continue;
512 }
513 if (DataSpecUtils::match(output.matcher, ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"})) {
514 auto concrete = DataSpecUtils::asConcreteDataMatcher(output.matcher);
515 if (concrete.subSpec == 0) {
516 continue;
517 }
518 auto& stfDist = pc.outputs().make<o2::header::STFHeader>(Output{concrete.origin, concrete.description, concrete.subSpec});
519 stfDist.id = timingInfo.timeslice;
520 stfDist.firstOrbit = timingInfo.firstTForbit;
521 stfDist.runNumber = timingInfo.runNumber;
522 // We mark it as not created, because we do should not account for it when
523 // checking if we created all the data for a timeslice.
524 O2_SIGNPOST_ID_FROM_POINTER(sid, stream_context, &streamContext);
525 O2_SIGNPOST_EVENT_EMIT(stream_context, sid, "finaliseOutputs", "Route %" PRIu64 " (%{public}s) was created by DPL.", (uint64_t)oi,
526 DataSpecUtils::describe(output.matcher).c_str());
527 streamContext.routeDPLCreated[oi] = true;
528 }
529 } },
530 .kind = ServiceKind::Global};
531}
536
537auto decongestionCallback = [](AsyncTask& task, size_t id) -> void {
538 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
539 auto& ref = task.user<DecongestionContext>().ref;
540
541 auto& decongestion = ref.get<DecongestionService>();
542 auto& proxy = ref.get<FairMQDeviceProxy>();
543
544 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
545 cid.value = id;
546 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Not sending already sent value: %" PRIu64 "> %" PRIu64,
548 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
549 return;
550 }
551 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Running oldest possible timeslice %" PRIu64 " propagation.",
552 (uint64_t)oldestPossibleOutput.timeslice.value);
553 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
554
555 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
556 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
557 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
558 // TODO: this we could cache in the proxy at the bind moment.
559 if (info.channelType != ChannelAccountingType::DPL) {
560 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
561 continue;
562 }
563 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value)) {
564 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
565 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
566 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
567 }
568 }
569 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
570};
571
572auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
573 auto& oldestPossibleOutput = task.user<DecongestionContext>().oldestPossibleOutput;
574 auto& ref = task.user<DecongestionContext>().ref;
575
576 auto& decongestion = ref.get<DecongestionService>();
577 auto& state = ref.get<DeviceState>();
578 auto& timesliceIndex = ref.get<TimesliceIndex>();
579 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
580 int64_t oldNextTimeslice = decongestion.nextTimeslice;
581 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
582 if (oldNextTimeslice != decongestion.nextTimeslice) {
583 if (state.transitionHandling != TransitionHandlingState::NoTransition && DefaultsHelpers::onlineDeploymentMode()) {
584 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
585 } else {
586 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
587 }
588 timesliceIndex.rescan();
589 }
590};
591
592// Callback for consumeWhenPastOldestPossibleTimeframe.
593// Runs in the async queue at the beginning of the next iteration,
594// after Retry slots unblocked by an oldestPossibleInput change have
595// been consumed and freed. Rescans all slots and forwards the
596// (now up-to-date) oldestPossibleOutput downstream.
597auto decongestionCallbackPastOldest = [](AsyncTask& task, size_t id) -> void {
598 auto& ref = task.user<DecongestionContext>().ref;
599
600 auto& decongestion = ref.get<DecongestionService>();
601 auto& timesliceIndex = ref.get<TimesliceIndex>();
602 auto& relayer = ref.get<DataRelayer>();
603 auto& proxy = ref.get<FairMQDeviceProxy>();
604 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
605
606 timesliceIndex.rescan();
607 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
608 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
609
610 if (oldestPossibleOutput.timeslice.value <= decongestion.lastTimeslice) {
611 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
612 "consumeWhenPastOldestPossibleTimeframe: not forwarding already sent value %" PRIu64,
613 (uint64_t)oldestPossibleOutput.timeslice.value);
614 return;
615 }
616 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
617 "consumeWhenPastOldestPossibleTimeframe: forwarding oldest possible timeslice %" PRIu64,
618 (uint64_t)oldestPossibleOutput.timeslice.value);
619 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
620
621 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
622 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
623 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
624 if (info.channelType != ChannelAccountingType::DPL) {
625 continue;
626 }
627 DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value);
628 }
629 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
630};
631
632// Decongestion service
633// If we do not have any Timeframe input, it means we must be creating timeslices
634// in order and that we should propagate the oldest possible timeslice at the end
635// of each processing step.
638{
639 return ServiceSpec{
640 .name = "decongestion",
641 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
642 auto* decongestion = new DecongestionService();
643 for (auto& input : services.get<DeviceSpec const>().inputs) {
644 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
645 LOGP(detail, "Found a real data input, we cannot update the oldest possible timeslice when sending messages");
646 decongestion->isFirstInTopology = false;
647 break;
648 }
649 }
650 for (const auto& label : services.get<DeviceSpec const>().labels) {
652 decongestion->suppressDomainInfo = true;
653 break;
654 }
655 }
656 auto& queue = services.get<AsyncQueue>();
657 decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100});
658 return ServiceHandle{TypeIdHelpers::uniqueId<DecongestionService>(), decongestion, ServiceKind::Serial};
659 },
660 .postForwarding = [](ProcessingContext& ctx, void* service) {
661 auto* decongestion = reinterpret_cast<DecongestionService*>(service);
662 if (O2_BUILTIN_LIKELY(decongestion->isFirstInTopology == false)) {
663 return;
664 }
665 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, service);
666 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "postForwardingCallbacks", "We are the first one in the topology, we need to update the oldest possible timeslice");
667 auto& timesliceIndex = ctx.services().get<TimesliceIndex>();
668 auto& relayer = ctx.services().get<DataRelayer>();
669 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
670 auto& proxy = ctx.services().get<FairMQDeviceProxy>();
671 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
672 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
673 LOGP(detail, "Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
674 return;
675 }
676
677 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
678 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
679 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
680 (uint64_t)oldestPossibleOutput.timeslice.value);
681 return;
682 }
683 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
684 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
685 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
686 return;
687 }
688
689 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Broadcasting oldest posssible output %" PRIu64 " due to %{public}s (%" PRIu64 ")",
690 (uint64_t)oldestPossibleOutput.timeslice.value,
691 oldestPossibleOutput.slot.index == -1 ? "channel" : "slot",
692 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
693 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Ordered active %d", decongestion->orderedCompletionPolicyActive);
694 if (decongestion->orderedCompletionPolicyActive) {
695 auto oldNextTimeslice = decongestion->nextTimeslice;
696 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
697 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Next timeslice %" PRIi64, decongestion->nextTimeslice);
698 if (oldNextTimeslice != decongestion->nextTimeslice) {
699 auto& state = ctx.services().get<DeviceState>();
701 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid, "oldest_possible_timeslice", "Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
702 } else {
703 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid, "oldest_possible_timeslice", "Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
704 }
705 timesliceIndex.rescan();
706 }
707 }
708 DataProcessingHelpers::broadcastOldestPossibleTimeslice(ctx.services(), oldestPossibleOutput.timeslice.value);
709
710 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
711 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
712 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
713 // TODO: this we could cache in the proxy at the bind moment.
714 if (info.channelType != ChannelAccountingType::DPL) {
715 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Skipping channel %{public}s", info.name.c_str());
716 continue;
717 }
718 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ctx.services(), info, state, oldestPossibleOutput.timeslice.value)) {
719 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice",
720 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64 ", priority %d",
721 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
722 }
723 }
724 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
725 .stop = [](ServiceRegistryRef services, void* service) {
726 auto* decongestion = (DecongestionService*)service;
727 services.get<TimesliceIndex>().reset();
728 decongestion->nextEnumerationTimeslice = 0;
729 decongestion->nextEnumerationTimesliceRewinded = false;
730 decongestion->lastTimeslice = 0;
731 decongestion->nextTimeslice = 0;
732 decongestion->oldestPossibleTimesliceTask = {0};
733 auto &state = services.get<DeviceState>();
734 for (auto &channel : state.inputChannelInfos) {
735 channel.oldestForChannel = {0};
736 } },
737 .domainInfoUpdated = [](ServiceRegistryRef services, size_t oldestPossibleTimeslice, ChannelIndex channel) {
738 auto& decongestion = services.get<DecongestionService>();
739 auto& relayer = services.get<DataRelayer>();
740 auto& timesliceIndex = services.get<TimesliceIndex>();
741 O2_SIGNPOST_ID_FROM_POINTER(cid, data_processor_context, &decongestion);
742 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Received oldest possible timeframe %" PRIu64 " from channel %d",
743 (uint64_t)oldestPossibleTimeslice, channel.value);
744 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
745 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
746 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
747
748 // When consumeWhenPastOldestPossibleTimeframe is active, we always
749 // schedule the callback even when oldestPossibleOutput has not changed
750 // yet. Retry slots held by this policy will be consumed after this
751 // domainInfoUpdated call (once getReadyToProcess re-checks them), and
752 // the callback — running in the next iteration — will recompute
753 // oldestPossibleOutput and forward the updated value downstream.
754 if (decongestion.consumeWhenPastOldestPossibleTimeframeActive) {
755 auto& queue = services.get<AsyncQueue>();
757 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleTimeslice},
758 .id = decongestion.oldestPossibleTimesliceTask,
759 .debounce = -1,
761 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
762 }
763
764 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
765 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
766 return;
767 }
768 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
769 LOGP(error, "We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
770 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
771 return;
772 }
773 auto& queue = services.get<AsyncQueue>();
774 const auto& state = services.get<DeviceState>();
777 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
778 (uint64_t)oldestPossibleOutput.timeslice.value);
780 queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
781 .id = decongestion.oldestPossibleTimesliceTask,
782 .debounce = -1, .callback = decongestionCallback}
783 .user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
784
785 if (decongestion.orderedCompletionPolicyActive) {
787 queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
788 .callback = decongestionCallbackOrdered}
789 .user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
790 } },
791 .kind = ServiceKind::Serial};
792}
793
794// FIXME: allow configuring the default number of threads per device
795// This should probably be done by overriding the preFork
796// callback and using the boost program options there to
797// get the default number of threads.
799{
800 return ServiceSpec{
801 .name = "threadpool",
802 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
803 auto* pool = new ThreadPool();
804 // FIXME: this will require some extra argument for the configuration context of a service
805 pool->poolSize = 1;
806 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
807 },
808 .configure = [](InitContext&, void* service) -> void* {
809 auto* t = reinterpret_cast<ThreadPool*>(service);
810 // FIXME: this will require some extra argument for the configuration context of a service
811 t->poolSize = 1;
812 return service;
813 },
814 .postForkParent = [](ServiceRegistryRef services) -> void {
815 // FIXME: this will require some extra argument for the configuration context of a service
816 auto numWorkersS = std::to_string(1);
817 setenv("UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
818 },
819 .kind = ServiceKind::Serial};
820}
821
822namespace
823{
824auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
825{
826 // Update the timer to make sure we have the correct time when sending out the stats.
827 uv_update_time(registry.get<DeviceState>().loop);
828 // Derive the amount of shared memory used
829 auto& runningWorkflow = registry.get<RunningWorkflowInfo const>();
830 using namespace fair::mq::shmem;
831 auto& spec = registry.get<DeviceSpec const>();
832
833 // FIXME: Ugly, but we do it only every 5 seconds...
834 if (stats.hasAvailSHMMetric) {
835 auto device = registry.get<RawDeviceService>().device();
836 long freeMemory = -1;
837 try {
838 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
839 } catch (...) {
840 }
841 if (freeMemory == -1) {
842 try {
843 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
844 } catch (...) {
845 }
846 }
847 stats.updateStats({static_cast<unsigned short>(static_cast<int>(ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE) + (runningWorkflow.shmSegmentId % 512)), DataProcessingStats::Op::SetIfPositive, freeMemory});
848 }
849
850 auto device = registry.get<RawDeviceService>().device();
851
852 int64_t totalBytesIn = 0;
853 int64_t totalBytesOut = 0;
854
855 for (auto& channel : device->GetChannels()) {
856 totalBytesIn += channel.second[0].GetBytesRx();
857 totalBytesOut += channel.second[0].GetBytesTx();
858 }
859
860 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_IN), DataProcessingStats::Op::Set, totalBytesIn / 1000000});
861 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_BYTES_OUT), DataProcessingStats::Op::Set, totalBytesOut / 1000000});
862
863 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_IN_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesIn / 1000000});
864 stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
865};
866
867auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
868{
869 if (!registry.get<DriverConfig const>().driverHasGUI) {
870 return;
871 }
872 states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
873 auto& client = registry.get<ControlService>();
874 client.push(spec, value, timestamp);
875 });
876}
877
878O2_DECLARE_DYNAMIC_LOG(monitoring_service);
879
881auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
882{
883 // Flushing metrics should only happen on main thread to avoid
884 // having to have a mutex for the communication with the driver.
885 O2_SIGNPOST_ID_GENERATE(sid, monitoring_service);
886 O2_SIGNPOST_START(monitoring_service, sid, "flush", "flushing metrics");
887 if (registry.isMainThread() == false) {
888 LOGP(fatal, "Flushing metrics should only happen on the main thread.");
889 }
890 auto& monitoring = registry.get<Monitoring>();
891 auto& relayer = registry.get<DataRelayer>();
892
893 // Send all the relevant metrics for the relayer to update the GUI
894 stats.flushChangedMetrics([&monitoring, sid](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) mutable -> void {
895 // convert timestamp to a time_point
896 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
897 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
898 if (spec.kind == DataProcessingStats::Kind::UInt64) {
899 if (value < 0) {
900 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is negative, setting to 0",
901 spec.name.c_str());
902 value = 0;
903 }
904 metric.addValue((uint64_t)value, "value");
905 } else {
906 if (value > (int64_t)std::numeric_limits<int>::max()) {
907 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is too large, setting to INT_MAX",
908 spec.name.c_str());
909 value = (int64_t)std::numeric_limits<int>::max();
910 }
911 if (value < (int64_t)std::numeric_limits<int>::min()) {
912 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Value for %{public}s is too small, setting to INT_MIN",
913 spec.name.c_str());
914 value = (int64_t)std::numeric_limits<int>::min();
915 }
916 metric.addValue((int)value, "value");
917 }
918 if (spec.scope == DataProcessingStats::Scope::DPL) {
919 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
920 }
921 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid, "flushChangedMetrics", "Flushing metric %{public}s", spec.name.c_str());
922 monitoring.send(std::move(metric));
923 });
924 relayer.sendContextState();
925 monitoring.flushBuffer();
926 O2_SIGNPOST_END(monitoring_service, sid, "flush", "done flushing metrics");
927};
928} // namespace
929
931{
932 return ServiceSpec{
933 .name = "data-processing-stats",
934 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
935 timespec now;
936 clock_gettime(CLOCK_REALTIME, &now);
937 uv_update_time(state.loop);
938 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
940 .minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
943 config);
944 auto& runningWorkflow = services.get<RunningWorkflowInfo const>();
945
946 // It makes no sense to update the stats more often than every 5s
947 int quickUpdateInterval = 5000;
948 uint64_t quickRefreshInterval = 7000;
949 uint64_t onlineRefreshLatency = 60000; // For metrics which are reported online, we flush them every 60s regardless of their state.
950 using MetricSpec = DataProcessingStats::MetricSpec;
951 using Kind = DataProcessingStats::Kind;
952 using Scope = DataProcessingStats::Scope;
953
954#ifdef NDEBUG
955 bool enableDebugMetrics = false;
956#else
957 bool enableDebugMetrics = true;
958#endif
959 bool arrowAndResourceLimitingMetrics = false;
961 arrowAndResourceLimitingMetrics = true;
962 }
963
964 int64_t consumedTimeframesPublishInterval = 0;
966 consumedTimeframesPublishInterval = 5000;
967 }
968 // Input proxies should not report cpu_usage_fraction,
969 // because of the rate limiting which biases the measurement.
970 auto& spec = services.get<DeviceSpec const>();
971 bool enableCPUUsageFraction = true;
972 auto isProxy = [](DataProcessorLabel const& label) -> bool { return label == DataProcessorLabel{"input-proxy"}; };
973 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
974 O2_SIGNPOST_ID_GENERATE(mid, policies);
975 O2_SIGNPOST_EVENT_EMIT(policies, mid, "metrics", "Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
976 enableCPUUsageFraction = false;
977 }
978
979 std::vector<DataProcessingStats::MetricSpec> metrics = {
980 MetricSpec{.name = "errors",
982 .kind = Kind::UInt64,
983 .scope = Scope::Online,
984 .minPublishInterval = quickUpdateInterval,
985 .maxRefreshLatency = quickRefreshInterval},
986 MetricSpec{.name = "exceptions",
988 .kind = Kind::UInt64,
989 .scope = Scope::Online,
990 .minPublishInterval = quickUpdateInterval},
991 MetricSpec{.name = "inputs/relayed/pending",
993 .kind = Kind::UInt64,
994 .minPublishInterval = quickUpdateInterval},
995 MetricSpec{.name = "inputs/relayed/incomplete",
997 .kind = Kind::UInt64,
998 .minPublishInterval = quickUpdateInterval},
999 MetricSpec{.name = "inputs/relayed/total",
1001 .kind = Kind::UInt64,
1002 .minPublishInterval = quickUpdateInterval},
1003 MetricSpec{.name = "elapsed_time_ms",
1005 .kind = Kind::UInt64,
1006 .minPublishInterval = quickUpdateInterval},
1007 MetricSpec{.name = "total_wall_time_ms",
1009 .kind = Kind::UInt64,
1010 .minPublishInterval = quickUpdateInterval},
1011 MetricSpec{.name = "last_processed_input_size_byte",
1013 .kind = Kind::UInt64,
1014 .minPublishInterval = quickUpdateInterval},
1015 MetricSpec{.name = "total_processed_input_size_byte",
1017 .kind = Kind::UInt64,
1018 .scope = Scope::Online,
1019 .minPublishInterval = quickUpdateInterval},
1020 MetricSpec{.name = "total_sigusr1",
1022 .kind = Kind::UInt64,
1023 .minPublishInterval = quickUpdateInterval},
1024 MetricSpec{.name = "consumed-timeframes",
1026 .kind = Kind::UInt64,
1027 .minPublishInterval = consumedTimeframesPublishInterval,
1028 .maxRefreshLatency = quickRefreshInterval,
1029 .sendInitialValue = true},
1030 MetricSpec{.name = "min_input_latency_ms",
1032 .kind = Kind::UInt64,
1033 .scope = Scope::Online,
1034 .minPublishInterval = quickUpdateInterval},
1035 MetricSpec{.name = "max_input_latency_ms",
1037 .kind = Kind::UInt64,
1038 .minPublishInterval = quickUpdateInterval},
1039 MetricSpec{.name = "total_rate_in_mb_s",
1041 .kind = Kind::Rate,
1042 .scope = Scope::Online,
1043 .minPublishInterval = quickUpdateInterval,
1044 .maxRefreshLatency = onlineRefreshLatency,
1045 .sendInitialValue = true},
1046 MetricSpec{.name = "total_rate_out_mb_s",
1048 .kind = Kind::Rate,
1049 .scope = Scope::Online,
1050 .minPublishInterval = quickUpdateInterval,
1051 .maxRefreshLatency = onlineRefreshLatency,
1052 .sendInitialValue = true},
1053 MetricSpec{.name = "processing_rate_hz",
1055 .kind = Kind::Rate,
1056 .scope = Scope::Online,
1057 .minPublishInterval = quickUpdateInterval,
1058 .maxRefreshLatency = onlineRefreshLatency,
1059 .sendInitialValue = true},
1060 MetricSpec{.name = "cpu_usage_fraction",
1061 .enabled = enableCPUUsageFraction,
1063 .kind = Kind::Rate,
1064 .scope = Scope::Online,
1065 .minPublishInterval = quickUpdateInterval,
1066 .maxRefreshLatency = onlineRefreshLatency,
1067 .sendInitialValue = true},
1068 MetricSpec{.name = "performed_computations",
1070 .kind = Kind::UInt64,
1071 .scope = Scope::Online,
1072 .minPublishInterval = quickUpdateInterval,
1073 .maxRefreshLatency = onlineRefreshLatency,
1074 .sendInitialValue = true},
1075 MetricSpec{.name = "total_bytes_in",
1077 .kind = Kind::UInt64,
1078 .scope = Scope::Online,
1079 .minPublishInterval = quickUpdateInterval,
1080 .maxRefreshLatency = onlineRefreshLatency,
1081 .sendInitialValue = true},
1082 MetricSpec{.name = "total_bytes_out",
1084 .kind = Kind::UInt64,
1085 .scope = Scope::Online,
1086 .minPublishInterval = quickUpdateInterval,
1087 .maxRefreshLatency = onlineRefreshLatency,
1088 .sendInitialValue = true},
1089 MetricSpec{.name = fmt::format("available_managed_shm_{}", runningWorkflow.shmSegmentId),
1090 .metricId = (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512),
1091 .kind = Kind::UInt64,
1092 .scope = Scope::Online,
1093 .minPublishInterval = 500,
1094 .maxRefreshLatency = onlineRefreshLatency,
1095 .sendInitialValue = true},
1096 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1097 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1098 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1099 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1100 MetricSpec{.name = "arrow-bytes-destroyed",
1101 .enabled = arrowAndResourceLimitingMetrics,
1102 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED),
1103 .kind = Kind::UInt64,
1104 .scope = Scope::DPL,
1105 .minPublishInterval = 0,
1106 .maxRefreshLatency = 10000,
1107 .sendInitialValue = true},
1108 MetricSpec{.name = "arrow-messages-destroyed",
1109 .enabled = arrowAndResourceLimitingMetrics,
1110 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED),
1111 .kind = Kind::UInt64,
1112 .scope = Scope::DPL,
1113 .minPublishInterval = 0,
1114 .maxRefreshLatency = 10000,
1115 .sendInitialValue = true},
1116 MetricSpec{.name = "arrow-bytes-created",
1117 .enabled = arrowAndResourceLimitingMetrics,
1118 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED),
1119 .kind = Kind::UInt64,
1120 .scope = Scope::DPL,
1121 .minPublishInterval = 0,
1122 .maxRefreshLatency = 10000,
1123 .sendInitialValue = true},
1124 MetricSpec{.name = "arrow-messages-created",
1125 .enabled = arrowAndResourceLimitingMetrics,
1126 .metricId = static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED),
1127 .kind = Kind::UInt64,
1128 .scope = Scope::DPL,
1129 .minPublishInterval = 0,
1130 .maxRefreshLatency = 10000,
1131 .sendInitialValue = true},
1132 MetricSpec{.name = "arrow-bytes-expired",
1133 .enabled = arrowAndResourceLimitingMetrics,
1134 .metricId = static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED),
1135 .kind = Kind::UInt64,
1136 .scope = Scope::DPL,
1137 .minPublishInterval = 0,
1138 .maxRefreshLatency = 10000,
1139 .sendInitialValue = true},
1140 MetricSpec{.name = "shm-offer-bytes-consumed",
1141 .enabled = arrowAndResourceLimitingMetrics,
1142 .metricId = static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED),
1143 .kind = Kind::UInt64,
1144 .scope = Scope::DPL,
1145 .minPublishInterval = 0,
1146 .maxRefreshLatency = 10000,
1147 .sendInitialValue = true},
1148 MetricSpec{.name = "timeslice-offer-number-consumed",
1149 .enabled = arrowAndResourceLimitingMetrics,
1150 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED),
1151 .kind = Kind::UInt64,
1152 .scope = Scope::DPL,
1153 .minPublishInterval = 0,
1154 .maxRefreshLatency = 10000,
1155 .sendInitialValue = true},
1156 MetricSpec{.name = "timeslices-expired",
1157 .enabled = arrowAndResourceLimitingMetrics,
1158 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED),
1159 .kind = Kind::UInt64,
1160 .scope = Scope::DPL,
1161 .minPublishInterval = 0,
1162 .maxRefreshLatency = 10000,
1163 .sendInitialValue = true},
1164 MetricSpec{.name = "timeslices-started",
1165 .enabled = arrowAndResourceLimitingMetrics,
1166 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_STARTED),
1167 .kind = Kind::UInt64,
1168 .scope = Scope::DPL,
1169 .minPublishInterval = 0,
1170 .maxRefreshLatency = 10000,
1171 .sendInitialValue = true},
1172 MetricSpec{.name = "timeslices-done",
1173 .enabled = arrowAndResourceLimitingMetrics,
1174 .metricId = static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_DONE),
1175 .kind = Kind::UInt64,
1176 .scope = Scope::DPL,
1177 .minPublishInterval = 0,
1178 .maxRefreshLatency = 10000,
1179 .sendInitialValue = true},
1180 MetricSpec{.name = "resources-missing",
1181 .enabled = enableDebugMetrics,
1182 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
1183 .kind = Kind::UInt64,
1184 .scope = Scope::DPL,
1185 .minPublishInterval = 1000,
1186 .maxRefreshLatency = 1000,
1187 .sendInitialValue = true},
1188 MetricSpec{.name = "resources-insufficient",
1189 .enabled = enableDebugMetrics,
1190 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
1191 .kind = Kind::UInt64,
1192 .scope = Scope::DPL,
1193 .minPublishInterval = 1000,
1194 .maxRefreshLatency = 1000,
1195 .sendInitialValue = true},
1196 MetricSpec{.name = "resources-satisfactory",
1197 .enabled = enableDebugMetrics,
1198 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
1199 .kind = Kind::UInt64,
1200 .scope = Scope::DPL,
1201 .minPublishInterval = 1000,
1202 .maxRefreshLatency = 1000,
1203 .sendInitialValue = true},
1204 MetricSpec{.name = "resource-offer-expired",
1205 .enabled = arrowAndResourceLimitingMetrics,
1206 .metricId = static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED),
1207 .kind = Kind::UInt64,
1208 .scope = Scope::DPL,
1209 .minPublishInterval = 0,
1210 .maxRefreshLatency = 10000,
1211 .sendInitialValue = true},
1212 MetricSpec{.name = "ccdb-cache-hit",
1213 .enabled = true,
1214 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_HIT),
1215 .kind = Kind::UInt64,
1216 .scope = Scope::DPL,
1217 .minPublishInterval = 1000,
1218 .maxRefreshLatency = 10000,
1219 .sendInitialValue = true},
1220 MetricSpec{.name = "ccdb-cache-miss",
1221 .enabled = true,
1222 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_MISS),
1223 .kind = Kind::UInt64,
1224 .scope = Scope::DPL,
1225 .minPublishInterval = 1000,
1226 .maxRefreshLatency = 10000,
1227 .sendInitialValue = true},
1228 MetricSpec{.name = "ccdb-cache-failure",
1229 .enabled = true,
1230 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_FAILURE),
1231 .kind = Kind::UInt64,
1232 .scope = Scope::DPL,
1233 .minPublishInterval = 1000,
1234 .maxRefreshLatency = 10000,
1235 .sendInitialValue = true},
1236 MetricSpec{.name = "ccdb-cache-fetched-bytes",
1237 .enabled = true,
1238 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES),
1239 .kind = Kind::UInt64,
1240 .scope = Scope::DPL,
1241 .minPublishInterval = 1000,
1242 .maxRefreshLatency = 10000,
1243 .sendInitialValue = true},
1244 MetricSpec{.name = "ccdb-cache-requested-bytes",
1245 .enabled = true,
1246 .metricId = static_cast<short>(ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES),
1247 .kind = Kind::UInt64,
1248 .scope = Scope::DPL,
1249 .minPublishInterval = 1000,
1250 .maxRefreshLatency = 10000,
1251 .sendInitialValue = true}};
1252
1253 for (auto& metric : metrics) {
1254 if (metric.metricId == (int)ProcessingStatsId::AVAILABLE_MANAGED_SHM_BASE + (runningWorkflow.shmSegmentId % 512)) {
1255 if (spec.name.compare("readout-proxy") == 0) {
1256 stats->hasAvailSHMMetric = true;
1257 } else {
1258 continue;
1259 }
1260 }
1261 stats->registerMetric(metric);
1262 }
1263
1264 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1265 },
1266 .configure = noConfiguration(),
1267 .postProcessing = [](ProcessingContext& context, void* service) {
1268 auto* stats = (DataProcessingStats*)service;
1270 .preDangling = [](DanglingContext& context, void* service) {
1271 auto* stats = (DataProcessingStats*)service;
1272 sendRelayerMetrics(context.services(), *stats);
1273 flushMetrics(context.services(), *stats); },
1274 .postDangling = [](DanglingContext& context, void* service) {
1275 auto* stats = (DataProcessingStats*)service;
1276 sendRelayerMetrics(context.services(), *stats);
1277 flushMetrics(context.services(), *stats); },
1278 .preEOS = [](EndOfStreamContext& context, void* service) {
1279 auto* stats = (DataProcessingStats*)service;
1280 sendRelayerMetrics(context.services(), *stats);
1281 flushMetrics(context.services(), *stats); },
1282 .preLoop = [](ServiceRegistryRef ref, void* service) {
1283 auto* stats = (DataProcessingStats*)service;
1284 flushMetrics(ref, *stats); },
1285 .kind = ServiceKind::Serial};
1286}
1287
1288// This is similar to the dataProcessingStats, but it designed to synchronize
1289// history-less metrics which are e.g. used for the GUI.
1291{
1292 return ServiceSpec{
1293 .name = "data-processing-states",
1294 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
1295 timespec now;
1296 clock_gettime(CLOCK_REALTIME, &now);
1297 uv_update_time(state.loop);
1298 uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
1301 states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
1302 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
1303 },
1304 .configure = noConfiguration(),
1305 .postProcessing = [](ProcessingContext& context, void* service) {
1306 auto* states = (DataProcessingStates*)service;
1307 states->processCommandQueue(); },
1308 .preDangling = [](DanglingContext& context, void* service) {
1309 auto* states = (DataProcessingStates*)service;
1310 flushStates(context.services(), *states); },
1311 .postDangling = [](DanglingContext& context, void* service) {
1312 auto* states = (DataProcessingStates*)service;
1313 flushStates(context.services(), *states); },
1314 .preEOS = [](EndOfStreamContext& context, void* service) {
1315 auto* states = (DataProcessingStates*)service;
1316 flushStates(context.services(), *states); },
1317 .kind = ServiceKind::Global};
1318}
1319
1321};
1322
1324{
1325 return ServiceSpec{
1326 .name = "gui-metrics",
1327 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
1328 auto* stats = new GUIMetrics();
1329 auto& monitoring = services.get<Monitoring>();
1330 auto& spec = services.get<DeviceSpec const>();
1331 monitoring.send({(int)spec.inputChannels.size(), fmt::format("oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1332 monitoring.send({(int)1, fmt::format("oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1333 monitoring.send({(int)spec.outputChannels.size(), fmt::format("oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1334 monitoring.send({(int)1, fmt::format("oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1335 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1336 },
1337 .configure = noConfiguration(),
1338 .postProcessing = [](ProcessingContext& context, void* service) {
1339 auto& relayer = context.services().get<DataRelayer>();
1340 auto& monitoring = context.services().get<Monitoring>();
1341 auto& spec = context.services().get<DeviceSpec const>();
1342 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1343 for (size_t ci; ci < spec.outputChannels.size(); ++ci) {
1344 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format("oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1345 } },
1346 .domainInfoUpdated = [](ServiceRegistryRef registry, size_t timeslice, ChannelIndex channel) {
1347 auto& monitoring = registry.get<Monitoring>();
1348 monitoring.send({(uint64_t)timeslice, fmt::format("oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1349 .active = false,
1350 .kind = ServiceKind::Serial};
1351}
1352
1354{
1355 return ServiceSpec{
1356 .name = "object-cache",
1357 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1358 auto* cache = new ObjectCache();
1359 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1360 },
1361 .configure = noConfiguration(),
1363}
1364
1366{
1367 return ServiceSpec{
1368 .name = "data-processing-context",
1369 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1370 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessorContext>(), new DataProcessorContext()};
1371 },
1372 .configure = noConfiguration(),
1373 .exit = [](ServiceRegistryRef, void* service) { auto* context = (DataProcessorContext*)service; delete context; },
1374 .kind = ServiceKind::Serial};
1375}
1376
1378{
1379 return ServiceSpec{
1380 .name = "data-allocator",
1381 .uniqueId = simpleServiceId<DataAllocator>(),
1382 .init = [](ServiceRegistryRef ref, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
1383 return ServiceHandle{
1384 .hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1385 .instance = new DataAllocator(ref),
1386 .kind = ServiceKind::Stream,
1387 .name = "data-allocator",
1388 };
1389 },
1390 .configure = noConfiguration(),
1391 .kind = ServiceKind::Stream};
1392}
1393
1395std::vector<ServiceSpec> CommonServices::defaultServices(std::string extraPlugins, int numThreads)
1396{
1397 std::vector<ServiceSpec> specs{
1401 asyncQueue(),
1408 controlSpec(),
1409 rootFileSpec(),
1410 parallelSpec(),
1411 callbacksSpec(),
1414 dataRelayer(),
1416 dataSender(),
1417 objectCache(),
1418 ccdbSupportSpec()};
1419
1421 specs.push_back(ArrowSupport::arrowBackendSpec());
1422 }
1425 specs.push_back(decongestionSpec());
1426
1427 std::string loadableServicesStr = extraPlugins;
1428 // Do not load InfoLogger by default if we are not at P2.
1430 if (loadableServicesStr.empty() == false) {
1431 loadableServicesStr += ",";
1432 }
1433 loadableServicesStr += "O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1434 }
1435 // Load plugins depending on the environment
1436 std::vector<LoadablePlugin> loadablePlugins = {};
1437 char* loadableServicesEnv = getenv("DPL_LOAD_SERVICES");
1438 // String to define the services to load is:
1439 //
1440 // library1:name1,library2:name2,...
1441 if (loadableServicesEnv) {
1442 if (loadableServicesStr.empty() == false) {
1443 loadableServicesStr += ",";
1444 }
1445 loadableServicesStr += loadableServicesEnv;
1446 }
1447 loadablePlugins = PluginManager::parsePluginSpecString(loadableServicesStr.c_str());
1448 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
1449 // I should make it optional depending wether the GUI is there or not...
1450 specs.push_back(CommonServices::guiMetricsSpec());
1451 if (numThreads) {
1452 specs.push_back(threadPool(numThreads));
1453 }
1454 return specs;
1455}
1456
1457std::vector<ServiceSpec> CommonServices::arrowServices()
1458{
1459 return {
1462 };
1463}
1464
1465} // namespace o2::framework
std::vector< std::string > labels
benchmark::State & state
std::vector< OutputRoute > routes
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
std::ostringstream debug
int16_t time
Definition RawEventData.h:4
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:506
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:507
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
Definition Signpost.h:574
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:564
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:523
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
Definition DataSender.h:34
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint id
Definition glcorearb.h:650
GLuint * states
Definition glcorearb.h:4932
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
@ DPL
The channel is a normal input channel.
const DataProcessorLabel suppressDomainInfoLabel
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
@ NoTransition
No pending transitions.
auto decongestionCallbackPastOldest
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
Definition AsyncQueue.h:32
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec controlSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
Running state information of a given device.
Definition DeviceState.h:34
static size_t maxLanes(std::vector< InputRoute > const &routes)
header::DataOrigin origin
Definition Output.h:28
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"