51#include "../src/DataProcessingStatus.h"
58#include <Configuration/ConfigurationInterface.h>
59#include <Configuration/ConfigurationFactory.h>
60#include <Monitoring/MonitoringFactory.h>
63#include <fairmq/Device.h>
64#include <fairmq/shmem/Monitor.h>
65#include <fairmq/shmem/Common.h>
66#include <fairmq/ProgOptions.h>
72using o2::configuration::ConfigurationFactory;
73using o2::configuration::ConfigurationInterface;
74using o2::monitoring::Monitoring;
75using o2::monitoring::MonitoringFactory;
76using Metric = o2::monitoring::Metric;
77using Key = o2::monitoring::tags::Key;
78using Value = o2::monitoring::tags::Value;
88#define MONITORING_QUEUE_SIZE 100
94 void* service =
nullptr;
95 bool isWebsocket = strncmp(options.GetPropertyAsString(
"driver-client-backend").c_str(),
"ws://", 4) == 0;
96 bool isDefault = options.GetPropertyAsString(
"monitoring-backend") ==
"default";
97 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString(
"monitoring-backend") ==
"dpl://";
98 o2::monitoring::Monitoring* monitoring;
101 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
103 monitoring->addBackend(std::move(dplBackend));
105 auto backend = isDefault ?
"infologger://" : options.GetPropertyAsString(
"monitoring-backend");
106 monitoring = MonitoringFactory::Get(backend).release();
108 service = monitoring;
112 monitoring->addGlobalTag(
"dataprocessor_name", registry.
get<
DeviceSpec const>().
name);
113 monitoring->addGlobalTag(
"dpl_instance", options.GetPropertyAsString(
"shm-segment-id"));
114 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
118 auto* monitoring = (o2::monitoring::Monitoring*)service;
120 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
121 if (extRunNumber ==
"unspecified") {
125 monitoring->setRunNumber(std::stoul(extRunNumber));
129 auto* monitoring =
reinterpret_cast<Monitoring*
>(service);
130 monitoring->flushBuffer();
131 delete monitoring; },
139 .
name =
"async-queue",
140 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
155 .
name =
"timing-info",
156 .uniqueId = simpleServiceId<TimingInfo>(),
157 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
165 .
name =
"stream-context",
166 .uniqueId = simpleServiceId<StreamContext>(),
167 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
176 stream->routeDPLCreated.resize(routes.size());
177 stream->routeCreated.resize(routes.size());
179 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false);
180 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false); },
189 bool userDidCreate =
false;
191 for (
size_t ri = 0; ri < routes.size(); ++ri) {
192 if (
stream->routeCreated[ri] ==
true &&
stream->routeDPLCreated[ri] ==
false) {
193 userDidCreate =
true;
197 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"userDidCreate == %d && didDispatch == %d",
199 messageContext.didDispatch());
200 if (userDidCreate ==
false && messageContext.didDispatch() ==
true) {
201 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
203 messageContext.didDispatch());
206 if (userDidCreate ==
false && messageContext.didDispatch() ==
false) {
211 for (
size_t ri = 0; ri < routes.size(); ++ri) {
212 auto &route = routes[ri];
213 auto &matcher = route.matcher;
214 if (
stream->routeDPLCreated[ri] ==
true) {
215 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created by DPL. ri = %" PRIu64
", %{public}s",
219 if (
stream->routeCreated[ri] ==
true) {
221 }
if ((timeslice % route.maxTimeslices) != route.timeslice) {
222 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Route ri = %" PRIu64
", skipped because of pipelining.",
226 if (matcher.lifetime == Lifetime::Timeframe) {
228 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64
" and might result in dropped timeframes",
230 LOGP(error,
"Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes",
DataSpecUtils::describe(matcher), timeslice);
244 stream->routeDPLCreated.resize(routes.size());
245 stream->routeCreated.resize(routes.size());
247 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false);
248 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false); },
255 .
name =
"datataking-contex",
256 .uniqueId = simpleServiceId<DataTakingContext>(),
257 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
261 for (
auto const&
ref : processingContext.
inputs()) {
263 const auto* dh = o2::header::get<o2::header::DataHeader*>(
ref.header);
267 context.runNumber = fmt::format(
"{}", dh->runNumber);
276 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
277 if (extRunNumber !=
"unspecified" || context.runNumber ==
"0") {
278 context.runNumber = extRunNumber;
280 auto extLHCPeriod = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"lhc_period",
"unspecified");
281 if (extLHCPeriod !=
"unspecified") {
282 context.lhcPeriod = extLHCPeriod;
284 static const char* months[12] = {
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC"};
285 time_t now =
time(
nullptr);
286 auto ltm = gmtime(&now);
287 context.lhcPeriod = months[ltm->tm_mon];
288 LOG(info) <<
"LHCPeriod is not available, using current month " << context.lhcPeriod;
291 auto extRunType = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"run_type",
"unspecified");
292 if (extRunType !=
"unspecified") {
293 context.runType = extRunType;
295 auto extEnvId = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"environment_id",
"unspecified");
296 if (extEnvId !=
"unspecified") {
297 context.envId = extEnvId;
299 auto extDetectors = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"detectors",
"unspecified");
300 if (extDetectors !=
"unspecified") {
301 context.detectors = extDetectors;
303 auto forcedRaw = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"force_run_as_raw",
"false");
304 context.forcedRaw = forcedRaw ==
"true"; },
314 .
name =
"configuration",
316 auto backend = options.GetPropertyAsString(
"configuration");
317 if (backend ==
"command-line") {
320 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
321 ConfigurationFactory::getConfiguration(backend).release()};
325 if (dc.options.count(
"configuration") == 0) {
329 auto backend = dc.options[
"configuration"].as<std::string>();
331 ConfigurationFactory::getConfiguration(backend).release()}); },
338 .
name =
"driverClient",
340 auto backend = options.GetPropertyAsString(
"driver-client-backend");
341 if (backend ==
"stdout://") {
342 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
346 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
358 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
368 .
name =
"localrootfile",
369 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
380 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
390 .
name =
"timesliceindex",
393 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
404 .init = simpleServiceInit<CallbackService, CallbackService>(),
412 .
name =
"datarelayer",
428 .
name =
"datasender",
477 .
name =
"ccdb-support",
481 for (
auto&
output : spec.outputs) {
483 LOGP(
debug,
"Optional inputs support enabled");
495 LOGP(
debug,
"We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
511 if (concrete.subSpec == 0) {
515 stfDist.
id = timingInfo.timeslice;
516 stfDist.firstOrbit = timingInfo.firstTForbit;
517 stfDist.runNumber = timingInfo.runNumber;
521 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"finaliseOutputs",
"Route %" PRIu64
" (%{public}s) was created by DPL.", (uint64_t)oi,
523 streamContext.routeDPLCreated[oi] =
true;
542 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
543 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Not sending already sent value: %" PRIu64
"> %" PRIu64,
544 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Running oldest possible timeslice %" PRIu64
" propagation.",
548 (uint64_t)oldestPossibleOutput.timeslice.value);
551 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
552 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
555 if (info.channelType != ChannelAccountingType::DPL) {
556 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
561 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
562 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
565 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
576 int64_t oldNextTimeslice = decongestion.nextTimeslice;
577 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
578 if (oldNextTimeslice != decongestion.nextTimeslice) {
580 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
582 O2_SIGNPOST_EVENT_EMIT_ERROR(async_queue, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
584 timesliceIndex.rescan();
596 .
name =
"decongestion",
599 for (
auto& input : services.
get<
DeviceSpec const>().inputs) {
600 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
601 LOGP(detail,
"Found a real data input, we cannot update the oldest possible timeslice when sending messages");
602 decongestion->isFirstInTopology =
false;
607 decongestion->oldestPossibleTimesliceTask =
AsyncQueueHelpers::create(queue, {.name =
"oldest-possible-timeslice", .score = 100});
616 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"postForwardingCallbacks",
"We are the first one in the topology, we need to update the oldest possible timeslice");
619 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
622 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
623 LOGP(detail,
"Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
627 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
629 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
630 (uint64_t)oldestPossibleOutput.timeslice.value);
633 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
634 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
635 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
639 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Broadcasting oldest posssible output %" PRIu64
" due to %{public}s (%" PRIu64
")",
640 (uint64_t)oldestPossibleOutput.timeslice.value,
641 oldestPossibleOutput.slot.index == -1 ?
"channel" :
"slot",
642 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
643 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Ordered active %d", decongestion->orderedCompletionPolicyActive);
644 if (decongestion->orderedCompletionPolicyActive) {
645 auto oldNextTimeslice = decongestion->nextTimeslice;
646 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
647 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Next timeslice %" PRIi64, decongestion->nextTimeslice);
648 if (oldNextTimeslice != decongestion->nextTimeslice) {
651 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
653 O2_SIGNPOST_EVENT_EMIT_ERROR(data_processor_context, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
660 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
661 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
665 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
670 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
671 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
674 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
678 decongestion->nextEnumerationTimeslice = 0;
679 decongestion->nextEnumerationTimesliceRewinded =
false;
680 decongestion->lastTimeslice = 0;
681 decongestion->nextTimeslice = 0;
682 decongestion->oldestPossibleTimesliceTask = {0};
684 for (
auto &channel :
state.inputChannelInfos) {
685 channel.oldestForChannel = {0};
692 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Received oldest possible timeframe %" PRIu64
" from channel %d",
693 (uint64_t)oldestPossibleTimeslice, channel.value);
694 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
695 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
696 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
698 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
699 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
702 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
703 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
704 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
711 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Queueing oldest possible timeslice %" PRIu64
" propagation for execution.",
712 (uint64_t)oldestPossibleOutput.timeslice.value);
715 .id = decongestion.oldestPossibleTimesliceTask,
719 if (decongestion.orderedCompletionPolicyActive) {
723 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
735 .
name =
"threadpool",
740 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
742 .configure = [](
InitContext&,
void* service) ->
void* {
743 auto* t =
reinterpret_cast<ThreadPool*
>(service);
751 setenv(
"UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
764 using namespace fair::mq::shmem;
765 auto& spec = registry.get<
DeviceSpec const>();
771 if (std::find_if(stats.metricSpecs.begin(), stats.metricSpecs.end(), hasMetric) != stats.metricSpecs.end()) {
773 long freeMemory = -1;
775 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
778 if (freeMemory == -1) {
780 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
787 auto device = registry.get<RawDeviceService>().device();
789 int64_t totalBytesIn = 0;
790 int64_t totalBytesOut = 0;
792 for (
auto& channel : device->GetChannels()) {
793 totalBytesIn += channel.second[0].GetBytesRx();
794 totalBytesOut += channel.second[0].GetBytesTx();
804auto flushStates(ServiceRegistryRef registry, DataProcessingStates&
states) ->
void
806 states.flushChangedStates([&
states, registry](std::string
const& spec, int64_t timestamp, std::string_view
value)
mutable ->
void {
807 auto& client = registry.get<ControlService>();
808 client.push(spec,
value, timestamp);
815auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) ->
void
821 if (registry.isMainThread() ==
false) {
822 LOGP(fatal,
"Flushing metrics should only happen on the main thread.");
824 auto& monitoring = registry.get<
Monitoring>();
825 auto& relayer = registry.get<DataRelayer>();
828 stats.flushChangedMetrics([&monitoring](DataProcessingStats::MetricSpec
const& spec, int64_t timestamp, int64_t
value)
mutable ->
void {
830 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
831 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
834 LOG(
debug) <<
"Value for " << spec.name <<
" is negative, setting to 0";
837 metric.addValue((uint64_t)
value,
"value");
839 if (
value > (int64_t)std::numeric_limits<int>::max()) {
840 LOG(warning) <<
"Value for " << spec.name <<
" is too large, setting to INT_MAX";
841 value = (int64_t)std::numeric_limits<int>::max();
843 if (
value < (int64_t)std::numeric_limits<int>::min()) {
844 value = (int64_t)std::numeric_limits<int>::min();
845 LOG(warning) <<
"Value for " << spec.name <<
" is too small, setting to INT_MIN";
847 metric.addValue((
int)
value,
"value");
850 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
852 monitoring.send(std::move(metric));
854 relayer.sendContextState();
855 monitoring.flushBuffer();
856 O2_SIGNPOST_END(monitoring_service, sid,
"flush",
"done flushing metrics");
863 .
name =
"data-processing-stats",
866 clock_gettime(CLOCK_REALTIME, &now);
867 uv_update_time(
state.loop);
868 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
870 .
minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>(
"dpl-stats-min-online-publishing-interval").c_str()) * 1000};
877 int quickUpdateInterval = 5000;
878 uint64_t quickRefreshInterval = 7000;
879 uint64_t onlineRefreshLatency = 60000;
885 bool enableDebugMetrics =
false;
887 bool enableDebugMetrics =
true;
889 bool arrowAndResourceLimitingMetrics =
false;
891 arrowAndResourceLimitingMetrics =
true;
896 bool enableCPUUsageFraction =
true;
898 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
900 O2_SIGNPOST_EVENT_EMIT(policies, mid,
"metrics",
"Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
901 enableCPUUsageFraction =
false;
904 std::vector<DataProcessingStats::MetricSpec>
metrics = {
905 MetricSpec{.name =
"errors",
907 .kind = Kind::UInt64,
908 .scope = Scope::Online,
909 .minPublishInterval = quickUpdateInterval,
910 .maxRefreshLatency = quickRefreshInterval},
911 MetricSpec{.name =
"exceptions",
913 .kind = Kind::UInt64,
914 .scope = Scope::Online,
915 .minPublishInterval = quickUpdateInterval},
916 MetricSpec{.name =
"inputs/relayed/pending",
918 .kind = Kind::UInt64,
919 .minPublishInterval = quickUpdateInterval},
920 MetricSpec{.name =
"inputs/relayed/incomplete",
922 .kind = Kind::UInt64,
923 .minPublishInterval = quickUpdateInterval},
924 MetricSpec{.name =
"inputs/relayed/total",
926 .kind = Kind::UInt64,
927 .minPublishInterval = quickUpdateInterval},
928 MetricSpec{.name =
"elapsed_time_ms",
930 .kind = Kind::UInt64,
931 .minPublishInterval = quickUpdateInterval},
932 MetricSpec{.name =
"total_wall_time_ms",
934 .kind = Kind::UInt64,
935 .minPublishInterval = quickUpdateInterval},
936 MetricSpec{.name =
"last_processed_input_size_byte",
938 .kind = Kind::UInt64,
939 .minPublishInterval = quickUpdateInterval},
940 MetricSpec{.name =
"total_processed_input_size_byte",
942 .kind = Kind::UInt64,
943 .scope = Scope::Online,
944 .minPublishInterval = quickUpdateInterval},
945 MetricSpec{.name =
"total_sigusr1",
947 .kind = Kind::UInt64,
948 .minPublishInterval = quickUpdateInterval},
949 MetricSpec{.name =
"consumed-timeframes",
951 .kind = Kind::UInt64,
952 .minPublishInterval = 0,
953 .maxRefreshLatency = quickRefreshInterval,
954 .sendInitialValue =
true},
955 MetricSpec{.name =
"min_input_latency_ms",
957 .kind = Kind::UInt64,
958 .scope = Scope::Online,
959 .minPublishInterval = quickUpdateInterval},
960 MetricSpec{.name =
"max_input_latency_ms",
962 .kind = Kind::UInt64,
963 .minPublishInterval = quickUpdateInterval},
964 MetricSpec{.name =
"total_rate_in_mb_s",
967 .scope = Scope::Online,
968 .minPublishInterval = quickUpdateInterval,
969 .maxRefreshLatency = onlineRefreshLatency,
970 .sendInitialValue =
true},
971 MetricSpec{.name =
"total_rate_out_mb_s",
974 .scope = Scope::Online,
975 .minPublishInterval = quickUpdateInterval,
976 .maxRefreshLatency = onlineRefreshLatency,
977 .sendInitialValue =
true},
978 MetricSpec{.name =
"processing_rate_hz",
981 .scope = Scope::Online,
982 .minPublishInterval = quickUpdateInterval,
983 .maxRefreshLatency = onlineRefreshLatency,
984 .sendInitialValue =
true},
985 MetricSpec{.name =
"cpu_usage_fraction",
986 .enabled = enableCPUUsageFraction,
989 .scope = Scope::Online,
990 .minPublishInterval = quickUpdateInterval,
991 .maxRefreshLatency = onlineRefreshLatency,
992 .sendInitialValue =
true},
993 MetricSpec{.name =
"performed_computations",
995 .kind = Kind::UInt64,
996 .scope = Scope::Online,
997 .minPublishInterval = quickUpdateInterval,
998 .maxRefreshLatency = onlineRefreshLatency,
999 .sendInitialValue =
true},
1000 MetricSpec{.name =
"total_bytes_in",
1002 .kind = Kind::UInt64,
1003 .scope = Scope::Online,
1004 .minPublishInterval = quickUpdateInterval,
1005 .maxRefreshLatency = onlineRefreshLatency,
1006 .sendInitialValue =
true},
1007 MetricSpec{.name =
"total_bytes_out",
1009 .kind = Kind::UInt64,
1010 .scope = Scope::Online,
1011 .minPublishInterval = quickUpdateInterval,
1012 .maxRefreshLatency = onlineRefreshLatency,
1013 .sendInitialValue =
true},
1014 MetricSpec{.name = fmt::format(
"available_managed_shm_{}", runningWorkflow.shmSegmentId),
1016 .kind = Kind::UInt64,
1017 .scope = Scope::Online,
1018 .minPublishInterval = 500,
1019 .maxRefreshLatency = onlineRefreshLatency,
1020 .sendInitialValue =
true},
1021 MetricSpec{.name =
"malformed_inputs", .metricId =
static_cast<short>(
ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1024 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(
ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1025 MetricSpec{.name =
"arrow-bytes-destroyed",
1026 .enabled = arrowAndResourceLimitingMetrics,
1028 .kind = Kind::UInt64,
1029 .scope = Scope::DPL,
1030 .minPublishInterval = 0,
1031 .maxRefreshLatency = 10000,
1032 .sendInitialValue =
true},
1033 MetricSpec{.name =
"arrow-messages-destroyed",
1034 .enabled = arrowAndResourceLimitingMetrics,
1036 .kind = Kind::UInt64,
1037 .scope = Scope::DPL,
1038 .minPublishInterval = 0,
1039 .maxRefreshLatency = 10000,
1040 .sendInitialValue =
true},
1041 MetricSpec{.name =
"arrow-bytes-created",
1042 .enabled = arrowAndResourceLimitingMetrics,
1044 .kind = Kind::UInt64,
1045 .scope = Scope::DPL,
1046 .minPublishInterval = 0,
1047 .maxRefreshLatency = 10000,
1048 .sendInitialValue =
true},
1049 MetricSpec{.name =
"arrow-messages-created",
1050 .enabled = arrowAndResourceLimitingMetrics,
1052 .kind = Kind::UInt64,
1053 .scope = Scope::DPL,
1054 .minPublishInterval = 0,
1055 .maxRefreshLatency = 10000,
1056 .sendInitialValue =
true},
1057 MetricSpec{.name =
"arrow-bytes-expired",
1058 .enabled = arrowAndResourceLimitingMetrics,
1060 .kind = Kind::UInt64,
1061 .scope = Scope::DPL,
1062 .minPublishInterval = 0,
1063 .maxRefreshLatency = 10000,
1064 .sendInitialValue =
true},
1065 MetricSpec{.name =
"shm-offer-bytes-consumed",
1066 .enabled = arrowAndResourceLimitingMetrics,
1068 .kind = Kind::UInt64,
1069 .scope = Scope::DPL,
1070 .minPublishInterval = 0,
1071 .maxRefreshLatency = 10000,
1072 .sendInitialValue =
true},
1073 MetricSpec{.name =
"resources-missing",
1074 .enabled = enableDebugMetrics,
1076 .kind = Kind::UInt64,
1077 .scope = Scope::DPL,
1078 .minPublishInterval = 1000,
1079 .maxRefreshLatency = 1000,
1080 .sendInitialValue =
true},
1081 MetricSpec{.name =
"resources-insufficient",
1082 .enabled = enableDebugMetrics,
1084 .kind = Kind::UInt64,
1085 .scope = Scope::DPL,
1086 .minPublishInterval = 1000,
1087 .maxRefreshLatency = 1000,
1088 .sendInitialValue =
true},
1089 MetricSpec{.name =
"resources-satisfactory",
1090 .enabled = enableDebugMetrics,
1092 .kind = Kind::UInt64,
1093 .scope = Scope::DPL,
1094 .minPublishInterval = 1000,
1095 .maxRefreshLatency = 1000,
1096 .sendInitialValue =
true},
1097 MetricSpec{.name =
"resource-offer-expired",
1098 .enabled = arrowAndResourceLimitingMetrics,
1100 .kind = Kind::UInt64,
1101 .scope = Scope::DPL,
1102 .minPublishInterval = 0,
1103 .maxRefreshLatency = 10000,
1104 .sendInitialValue =
true}};
1106 for (
auto& metric :
metrics) {
1110 stats->registerMetric(metric);
1113 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1121 sendRelayerMetrics(context.
services(), *stats);
1122 flushMetrics(context.
services(), *stats); },
1125 sendRelayerMetrics(context.
services(), *stats);
1126 flushMetrics(context.
services(), *stats); },
1129 sendRelayerMetrics(context.
services(), *stats);
1130 flushMetrics(context.
services(), *stats); },
1133 flushMetrics(
ref, *stats); },
1142 .
name =
"data-processing-states",
1145 clock_gettime(CLOCK_REALTIME, &now);
1146 uv_update_time(
state.loop);
1147 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
1156 states->processCommandQueue(); },
1175 .
name =
"gui-metrics",
1180 monitoring.send({(
int)spec.inputChannels.size(), fmt::format(
"oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1181 monitoring.send({(
int)1, fmt::format(
"oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1182 monitoring.send({(
int)spec.outputChannels.size(), fmt::format(
"oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1183 monitoring.send({(
int)1, fmt::format(
"oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1184 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1191 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1192 for (
size_t ci; ci < spec.outputChannels.size(); ++ci) {
1193 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format(
"oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1197 monitoring.send({(uint64_t)timeslice, fmt::format(
"oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1205 .
name =
"object-cache",
1208 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1217 .
name =
"data-processing-context",
1229 .
name =
"device-context",
1240 .
name =
"data-allocator",
1241 .uniqueId = simpleServiceId<DataAllocator>(),
1244 .
hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1247 .name =
"data-allocator",
1257 std::vector<ServiceSpec> specs{
1287 std::string loadableServicesStr = extraPlugins;
1290 if (loadableServicesStr.empty() ==
false) {
1291 loadableServicesStr +=
",";
1293 loadableServicesStr +=
"O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1296 std::vector<LoadablePlugin> loadablePlugins = {};
1297 char* loadableServicesEnv = getenv(
"DPL_LOAD_SERVICES");
1301 if (loadableServicesEnv) {
1302 if (loadableServicesStr.empty() ==
false) {
1303 loadableServicesStr +=
",";
1305 loadableServicesStr += loadableServicesEnv;
1308 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
auto decongestionCallback
@ NoTransition
No pending transitions.
@ AVAILABLE_MANAGED_SHM_BASE
@ SHM_OFFER_BYTES_CONSUMED
@ DROPPED_INCOMING_MESSAGES
@ ARROW_MESSAGES_DESTROYED
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
static ServiceSpec fairMQDeviceProxy()
static ServiceSpec fairMQBackendSpec()
static ServiceSpec stringBackendSpec()
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec objectCache()
static ServiceSpec controlSpec()
static ServiceSpec deviceContextSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
int64_t minOnlinePublishInterval
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
header::DataOrigin origin
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"