53#include "../src/DataProcessingStatus.h"
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/MonitoringFactory.h>
65#include <fairmq/Device.h>
66#include <fairmq/shmem/Monitor.h>
67#include <fairmq/shmem/Common.h>
68#include <fairmq/ProgOptions.h>
74using o2::configuration::ConfigurationFactory;
75using o2::configuration::ConfigurationInterface;
76using o2::monitoring::Monitoring;
77using o2::monitoring::MonitoringFactory;
78using Metric = o2::monitoring::Metric;
79using Key = o2::monitoring::tags::Key;
80using Value = o2::monitoring::tags::Value;
90#define MONITORING_QUEUE_SIZE 100
96 void* service =
nullptr;
97 bool isWebsocket = strncmp(options.GetPropertyAsString(
"driver-client-backend").c_str(),
"ws://", 4) == 0;
98 bool isDefault = options.GetPropertyAsString(
"monitoring-backend") ==
"default";
99 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString(
"monitoring-backend") ==
"dpl://";
100 o2::monitoring::Monitoring* monitoring;
103 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
105 monitoring->addBackend(std::move(dplBackend));
107 auto backend = isDefault ?
"infologger://" : options.GetPropertyAsString(
"monitoring-backend");
108 monitoring = MonitoringFactory::Get(backend).release();
110 service = monitoring;
114 monitoring->addGlobalTag(
"dataprocessor_name", registry.
get<
DeviceSpec const>().
name);
115 monitoring->addGlobalTag(
"dpl_instance", options.GetPropertyAsString(
"shm-segment-id"));
116 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
120 auto* monitoring = (o2::monitoring::Monitoring*)service;
122 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
123 if (extRunNumber ==
"unspecified") {
127 monitoring->setRunNumber(std::stoul(extRunNumber));
131 auto* monitoring =
reinterpret_cast<Monitoring*
>(service);
132 monitoring->flushBuffer();
133 delete monitoring; },
141 .
name =
"async-queue",
142 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
157 .
name =
"timing-info",
158 .uniqueId = simpleServiceId<TimingInfo>(),
159 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
167 .
name =
"stream-context",
168 .uniqueId = simpleServiceId<StreamContext>(),
169 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
181 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false);
182 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false); },
191 bool userDidCreate =
false;
193 for (
size_t ri = 0; ri <
routes.size(); ++ri) {
194 if (
stream->routeCreated[ri] ==
true &&
stream->routeDPLCreated[ri] ==
false) {
195 userDidCreate =
true;
199 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"userDidCreate == %d && didDispatch == %d",
201 messageContext.didDispatch());
202 if (userDidCreate ==
false && messageContext.didDispatch() ==
true) {
203 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
205 messageContext.didDispatch());
208 if (userDidCreate ==
false && messageContext.didDispatch() ==
false) {
213 for (
size_t ri = 0; ri <
routes.size(); ++ri) {
215 auto &matcher = route.matcher;
216 if (
stream->routeDPLCreated[ri] ==
true) {
217 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created by DPL. ri = %" PRIu64
", %{public}s",
221 if (
stream->routeCreated[ri] ==
true) {
223 }
if ((timeslice % route.maxTimeslices) != route.timeslice) {
224 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Route ri = %" PRIu64
", skipped because of pipelining.",
228 if (matcher.lifetime == Lifetime::Timeframe) {
230 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64
" and might result in dropped timeframes",
232 LOGP(error,
"Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes",
DataSpecUtils::describe(matcher), timeslice);
249 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false);
250 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false); },
257 .
name =
"datataking-contex",
258 .uniqueId = simpleServiceId<DataTakingContext>(),
259 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
263 for (
auto const&
ref : processingContext.
inputs()) {
265 const auto* dh = o2::header::get<o2::header::DataHeader*>(
ref.header);
269 context.runNumber = fmt::format(
"{}", dh->runNumber);
278 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
279 if (extRunNumber !=
"unspecified" || context.runNumber ==
"0") {
280 context.runNumber = extRunNumber;
282 auto extLHCPeriod = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"lhc_period",
"unspecified");
283 if (extLHCPeriod !=
"unspecified") {
284 context.lhcPeriod = extLHCPeriod;
286 static const char* months[12] = {
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC"};
287 time_t now =
time(
nullptr);
288 auto ltm = gmtime(&now);
289 context.lhcPeriod = months[ltm->tm_mon];
290 LOG(info) <<
"LHCPeriod is not available, using current month " << context.lhcPeriod;
293 auto extRunType = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"run_type",
"unspecified");
294 if (extRunType !=
"unspecified") {
295 context.runType = extRunType;
297 auto extEnvId = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"environment_id",
"unspecified");
298 if (extEnvId !=
"unspecified") {
299 context.envId = extEnvId;
301 auto extDetectors = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"detectors",
"unspecified");
302 if (extDetectors !=
"unspecified") {
303 context.detectors = extDetectors;
305 auto forcedRaw = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"force_run_as_raw",
"false");
306 context.forcedRaw = forcedRaw ==
"true"; },
316 .
name =
"configuration",
318 auto backend = options.GetPropertyAsString(
"configuration");
319 if (backend ==
"command-line") {
322 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
323 ConfigurationFactory::getConfiguration(backend).release()};
327 if (dc.options.count(
"configuration") == 0) {
331 auto backend = dc.options[
"configuration"].as<std::string>();
333 ConfigurationFactory::getConfiguration(backend).release()}); },
340 .
name =
"driverClient",
342 auto backend = options.GetPropertyAsString(
"driver-client-backend");
343 if (backend ==
"stdout://") {
344 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
348 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
360 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
370 .
name =
"localrootfile",
371 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
382 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
392 .
name =
"timesliceindex",
395 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
406 .init = simpleServiceInit<CallbackService, CallbackService>(),
414 .
name =
"datarelayer",
432 .
name =
"datasender",
481 .
name =
"ccdb-support",
485 for (
auto&
output : spec.outputs) {
487 LOGP(
debug,
"Optional inputs support enabled");
499 LOGP(
debug,
"We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
515 if (concrete.subSpec == 0) {
519 stfDist.
id = timingInfo.timeslice;
520 stfDist.firstOrbit = timingInfo.firstTForbit;
521 stfDist.runNumber = timingInfo.runNumber;
525 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"finaliseOutputs",
"Route %" PRIu64
" (%{public}s) was created by DPL.", (uint64_t)oi,
527 streamContext.routeDPLCreated[oi] =
true;
546 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Not sending already sent value: %" PRIu64
"> %" PRIu64,
548 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
551 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Running oldest possible timeslice %" PRIu64
" propagation.",
552 (uint64_t)oldestPossibleOutput.timeslice.value);
555 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
556 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
559 if (info.channelType != ChannelAccountingType::DPL) {
560 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
565 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
566 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
569 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
580 int64_t oldNextTimeslice = decongestion.nextTimeslice;
581 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
582 if (oldNextTimeslice != decongestion.nextTimeslice) {
584 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
586 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
588 timesliceIndex.rescan();
606 timesliceIndex.rescan();
607 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
610 if (oldestPossibleOutput.timeslice.value <= decongestion.lastTimeslice) {
612 "consumeWhenPastOldestPossibleTimeframe: not forwarding already sent value %" PRIu64,
613 (uint64_t)oldestPossibleOutput.timeslice.value);
617 "consumeWhenPastOldestPossibleTimeframe: forwarding oldest possible timeslice %" PRIu64,
618 (uint64_t)oldestPossibleOutput.timeslice.value);
621 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
622 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
624 if (info.channelType != ChannelAccountingType::DPL) {
629 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
640 .
name =
"decongestion",
643 for (
auto& input : services.
get<
DeviceSpec const>().inputs) {
644 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
645 LOGP(detail,
"Found a real data input, we cannot update the oldest possible timeslice when sending messages");
646 decongestion->isFirstInTopology =
false;
652 decongestion->suppressDomainInfo =
true;
657 decongestion->oldestPossibleTimesliceTask =
AsyncQueueHelpers::create(queue, {.name =
"oldest-possible-timeslice", .score = 100});
666 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"postForwardingCallbacks",
"We are the first one in the topology, we need to update the oldest possible timeslice");
669 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
672 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
673 LOGP(detail,
"Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
677 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
679 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
680 (uint64_t)oldestPossibleOutput.timeslice.value);
683 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
684 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
685 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
689 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Broadcasting oldest posssible output %" PRIu64
" due to %{public}s (%" PRIu64
")",
690 (uint64_t)oldestPossibleOutput.timeslice.value,
691 oldestPossibleOutput.slot.index == -1 ?
"channel" :
"slot",
692 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
693 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Ordered active %d", decongestion->orderedCompletionPolicyActive);
694 if (decongestion->orderedCompletionPolicyActive) {
695 auto oldNextTimeslice = decongestion->nextTimeslice;
696 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
697 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Next timeslice %" PRIi64, decongestion->nextTimeslice);
698 if (oldNextTimeslice != decongestion->nextTimeslice) {
701 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
703 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
710 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
711 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
715 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
720 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
721 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
724 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
728 decongestion->nextEnumerationTimeslice = 0;
729 decongestion->nextEnumerationTimesliceRewinded =
false;
730 decongestion->lastTimeslice = 0;
731 decongestion->nextTimeslice = 0;
732 decongestion->oldestPossibleTimesliceTask = {0};
734 for (
auto &channel :
state.inputChannelInfos) {
735 channel.oldestForChannel = {0};
742 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Received oldest possible timeframe %" PRIu64
" from channel %d",
743 (uint64_t)oldestPossibleTimeslice, channel.value);
744 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
745 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
746 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
754 if (decongestion.consumeWhenPastOldestPossibleTimeframeActive) {
758 .id = decongestion.oldestPossibleTimesliceTask,
761 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
764 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
765 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
768 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
769 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
770 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
777 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Queueing oldest possible timeslice %" PRIu64
" propagation for execution.",
778 (uint64_t)oldestPossibleOutput.timeslice.value);
781 .id = decongestion.oldestPossibleTimesliceTask,
785 if (decongestion.orderedCompletionPolicyActive) {
789 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
801 .
name =
"threadpool",
806 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
808 .configure = [](
InitContext&,
void* service) ->
void* {
809 auto* t =
reinterpret_cast<ThreadPool*
>(service);
817 setenv(
"UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
830 using namespace fair::mq::shmem;
831 auto& spec = registry.get<
DeviceSpec const>();
834 if (stats.hasAvailSHMMetric) {
836 long freeMemory = -1;
838 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
841 if (freeMemory == -1) {
843 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
850 auto device = registry.get<RawDeviceService>().device();
855 for (
auto& channel : device->GetChannels()) {
856 totalBytesIn += channel.second[0].GetBytesRx();
857 totalBytesOut += channel.second[0].GetBytesTx();
867auto flushStates(ServiceRegistryRef registry, DataProcessingStates&
states) ->
void
869 if (!registry.get<DriverConfig
const>().driverHasGUI) {
872 states.flushChangedStates([&
states, registry](std::string
const& spec,
int64_t timestamp, std::string_view
value)
mutable ->
void {
873 auto& client = registry.get<ControlService>();
874 client.push(spec,
value, timestamp);
881auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) ->
void
887 if (registry.isMainThread() ==
false) {
888 LOGP(fatal,
"Flushing metrics should only happen on the main thread.");
890 auto& monitoring = registry.get<
Monitoring>();
891 auto& relayer = registry.get<DataRelayer>();
894 stats.flushChangedMetrics([&monitoring, sid](DataProcessingStats::MetricSpec
const& spec,
int64_t timestamp,
int64_t value)
mutable ->
void {
896 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
897 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
900 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is negative, setting to 0",
904 metric.addValue((uint64_t)
value,
"value");
907 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is too large, setting to INT_MAX",
912 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is too small, setting to INT_MIN",
916 metric.addValue((
int)
value,
"value");
919 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
921 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Flushing metric %{public}s", spec.name.c_str());
922 monitoring.send(std::move(metric));
924 relayer.sendContextState();
925 monitoring.flushBuffer();
926 O2_SIGNPOST_END(monitoring_service, sid,
"flush",
"done flushing metrics");
933 .
name =
"data-processing-stats",
936 clock_gettime(CLOCK_REALTIME, &now);
937 uv_update_time(
state.loop);
938 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
940 .
minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>(
"dpl-stats-min-online-publishing-interval").c_str()) * 1000};
947 int quickUpdateInterval = 5000;
948 uint64_t quickRefreshInterval = 7000;
949 uint64_t onlineRefreshLatency = 60000;
955 bool enableDebugMetrics =
false;
957 bool enableDebugMetrics =
true;
959 bool arrowAndResourceLimitingMetrics =
false;
961 arrowAndResourceLimitingMetrics =
true;
964 int64_t consumedTimeframesPublishInterval = 0;
966 consumedTimeframesPublishInterval = 5000;
971 bool enableCPUUsageFraction =
true;
973 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
975 O2_SIGNPOST_EVENT_EMIT(policies, mid,
"metrics",
"Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
976 enableCPUUsageFraction =
false;
979 std::vector<DataProcessingStats::MetricSpec>
metrics = {
980 MetricSpec{.name =
"errors",
982 .
kind = Kind::UInt64,
983 .scope = Scope::Online,
984 .minPublishInterval = quickUpdateInterval,
985 .maxRefreshLatency = quickRefreshInterval},
986 MetricSpec{.name =
"exceptions",
988 .
kind = Kind::UInt64,
989 .scope = Scope::Online,
990 .minPublishInterval = quickUpdateInterval},
991 MetricSpec{.name =
"inputs/relayed/pending",
993 .
kind = Kind::UInt64,
994 .minPublishInterval = quickUpdateInterval},
995 MetricSpec{.name =
"inputs/relayed/incomplete",
997 .
kind = Kind::UInt64,
998 .minPublishInterval = quickUpdateInterval},
999 MetricSpec{.name =
"inputs/relayed/total",
1001 .
kind = Kind::UInt64,
1002 .minPublishInterval = quickUpdateInterval},
1003 MetricSpec{.name =
"elapsed_time_ms",
1005 .
kind = Kind::UInt64,
1006 .minPublishInterval = quickUpdateInterval},
1007 MetricSpec{.name =
"total_wall_time_ms",
1009 .
kind = Kind::UInt64,
1010 .minPublishInterval = quickUpdateInterval},
1011 MetricSpec{.name =
"last_processed_input_size_byte",
1013 .
kind = Kind::UInt64,
1014 .minPublishInterval = quickUpdateInterval},
1015 MetricSpec{.name =
"total_processed_input_size_byte",
1017 .
kind = Kind::UInt64,
1018 .scope = Scope::Online,
1019 .minPublishInterval = quickUpdateInterval},
1020 MetricSpec{.name =
"total_sigusr1",
1022 .
kind = Kind::UInt64,
1023 .minPublishInterval = quickUpdateInterval},
1024 MetricSpec{.name =
"consumed-timeframes",
1026 .
kind = Kind::UInt64,
1027 .minPublishInterval = consumedTimeframesPublishInterval,
1028 .maxRefreshLatency = quickRefreshInterval,
1029 .sendInitialValue =
true},
1030 MetricSpec{.name =
"min_input_latency_ms",
1032 .
kind = Kind::UInt64,
1033 .scope = Scope::Online,
1034 .minPublishInterval = quickUpdateInterval},
1035 MetricSpec{.name =
"max_input_latency_ms",
1037 .
kind = Kind::UInt64,
1038 .minPublishInterval = quickUpdateInterval},
1039 MetricSpec{.name =
"total_rate_in_mb_s",
1042 .scope = Scope::Online,
1043 .minPublishInterval = quickUpdateInterval,
1044 .maxRefreshLatency = onlineRefreshLatency,
1045 .sendInitialValue =
true},
1046 MetricSpec{.name =
"total_rate_out_mb_s",
1049 .scope = Scope::Online,
1050 .minPublishInterval = quickUpdateInterval,
1051 .maxRefreshLatency = onlineRefreshLatency,
1052 .sendInitialValue =
true},
1053 MetricSpec{.name =
"processing_rate_hz",
1056 .scope = Scope::Online,
1057 .minPublishInterval = quickUpdateInterval,
1058 .maxRefreshLatency = onlineRefreshLatency,
1059 .sendInitialValue =
true},
1060 MetricSpec{.name =
"cpu_usage_fraction",
1061 .enabled = enableCPUUsageFraction,
1064 .scope = Scope::Online,
1065 .minPublishInterval = quickUpdateInterval,
1066 .maxRefreshLatency = onlineRefreshLatency,
1067 .sendInitialValue =
true},
1068 MetricSpec{.name =
"performed_computations",
1070 .
kind = Kind::UInt64,
1071 .scope = Scope::Online,
1072 .minPublishInterval = quickUpdateInterval,
1073 .maxRefreshLatency = onlineRefreshLatency,
1074 .sendInitialValue =
true},
1075 MetricSpec{.name =
"total_bytes_in",
1077 .
kind = Kind::UInt64,
1078 .scope = Scope::Online,
1079 .minPublishInterval = quickUpdateInterval,
1080 .maxRefreshLatency = onlineRefreshLatency,
1081 .sendInitialValue =
true},
1082 MetricSpec{.name =
"total_bytes_out",
1084 .
kind = Kind::UInt64,
1085 .scope = Scope::Online,
1086 .minPublishInterval = quickUpdateInterval,
1087 .maxRefreshLatency = onlineRefreshLatency,
1088 .sendInitialValue =
true},
1089 MetricSpec{.name = fmt::format(
"available_managed_shm_{}", runningWorkflow.shmSegmentId),
1091 .kind = Kind::UInt64,
1092 .scope = Scope::Online,
1093 .minPublishInterval = 500,
1094 .maxRefreshLatency = onlineRefreshLatency,
1095 .sendInitialValue =
true},
1100 MetricSpec{.name =
"arrow-bytes-destroyed",
1101 .enabled = arrowAndResourceLimitingMetrics,
1103 .
kind = Kind::UInt64,
1104 .scope = Scope::DPL,
1105 .minPublishInterval = 0,
1106 .maxRefreshLatency = 10000,
1107 .sendInitialValue =
true},
1108 MetricSpec{.name =
"arrow-messages-destroyed",
1109 .enabled = arrowAndResourceLimitingMetrics,
1111 .
kind = Kind::UInt64,
1112 .scope = Scope::DPL,
1113 .minPublishInterval = 0,
1114 .maxRefreshLatency = 10000,
1115 .sendInitialValue =
true},
1116 MetricSpec{.name =
"arrow-bytes-created",
1117 .enabled = arrowAndResourceLimitingMetrics,
1119 .
kind = Kind::UInt64,
1120 .scope = Scope::DPL,
1121 .minPublishInterval = 0,
1122 .maxRefreshLatency = 10000,
1123 .sendInitialValue =
true},
1124 MetricSpec{.name =
"arrow-messages-created",
1125 .enabled = arrowAndResourceLimitingMetrics,
1127 .
kind = Kind::UInt64,
1128 .scope = Scope::DPL,
1129 .minPublishInterval = 0,
1130 .maxRefreshLatency = 10000,
1131 .sendInitialValue =
true},
1132 MetricSpec{.name =
"arrow-bytes-expired",
1133 .enabled = arrowAndResourceLimitingMetrics,
1135 .
kind = Kind::UInt64,
1136 .scope = Scope::DPL,
1137 .minPublishInterval = 0,
1138 .maxRefreshLatency = 10000,
1139 .sendInitialValue =
true},
1140 MetricSpec{.name =
"shm-offer-bytes-consumed",
1141 .enabled = arrowAndResourceLimitingMetrics,
1143 .
kind = Kind::UInt64,
1144 .scope = Scope::DPL,
1145 .minPublishInterval = 0,
1146 .maxRefreshLatency = 10000,
1147 .sendInitialValue =
true},
1148 MetricSpec{.name =
"timeslice-offer-number-consumed",
1149 .enabled = arrowAndResourceLimitingMetrics,
1151 .
kind = Kind::UInt64,
1152 .scope = Scope::DPL,
1153 .minPublishInterval = 0,
1154 .maxRefreshLatency = 10000,
1155 .sendInitialValue =
true},
1156 MetricSpec{.name =
"timeslices-expired",
1157 .enabled = arrowAndResourceLimitingMetrics,
1159 .
kind = Kind::UInt64,
1160 .scope = Scope::DPL,
1161 .minPublishInterval = 0,
1162 .maxRefreshLatency = 10000,
1163 .sendInitialValue =
true},
1164 MetricSpec{.name =
"timeslices-started",
1165 .enabled = arrowAndResourceLimitingMetrics,
1167 .
kind = Kind::UInt64,
1168 .scope = Scope::DPL,
1169 .minPublishInterval = 0,
1170 .maxRefreshLatency = 10000,
1171 .sendInitialValue =
true},
1172 MetricSpec{.name =
"timeslices-done",
1173 .enabled = arrowAndResourceLimitingMetrics,
1175 .
kind = Kind::UInt64,
1176 .scope = Scope::DPL,
1177 .minPublishInterval = 0,
1178 .maxRefreshLatency = 10000,
1179 .sendInitialValue =
true},
1180 MetricSpec{.name =
"resources-missing",
1181 .enabled = enableDebugMetrics,
1183 .
kind = Kind::UInt64,
1184 .scope = Scope::DPL,
1185 .minPublishInterval = 1000,
1186 .maxRefreshLatency = 1000,
1187 .sendInitialValue =
true},
1188 MetricSpec{.name =
"resources-insufficient",
1189 .enabled = enableDebugMetrics,
1191 .
kind = Kind::UInt64,
1192 .scope = Scope::DPL,
1193 .minPublishInterval = 1000,
1194 .maxRefreshLatency = 1000,
1195 .sendInitialValue =
true},
1196 MetricSpec{.name =
"resources-satisfactory",
1197 .enabled = enableDebugMetrics,
1199 .
kind = Kind::UInt64,
1200 .scope = Scope::DPL,
1201 .minPublishInterval = 1000,
1202 .maxRefreshLatency = 1000,
1203 .sendInitialValue =
true},
1204 MetricSpec{.name =
"resource-offer-expired",
1205 .enabled = arrowAndResourceLimitingMetrics,
1207 .
kind = Kind::UInt64,
1208 .scope = Scope::DPL,
1209 .minPublishInterval = 0,
1210 .maxRefreshLatency = 10000,
1211 .sendInitialValue =
true},
1212 MetricSpec{.name =
"ccdb-cache-hit",
1215 .
kind = Kind::UInt64,
1216 .scope = Scope::DPL,
1217 .minPublishInterval = 1000,
1218 .maxRefreshLatency = 10000,
1219 .sendInitialValue =
true},
1220 MetricSpec{.name =
"ccdb-cache-miss",
1223 .
kind = Kind::UInt64,
1224 .scope = Scope::DPL,
1225 .minPublishInterval = 1000,
1226 .maxRefreshLatency = 10000,
1227 .sendInitialValue =
true},
1228 MetricSpec{.name =
"ccdb-cache-failure",
1231 .
kind = Kind::UInt64,
1232 .scope = Scope::DPL,
1233 .minPublishInterval = 1000,
1234 .maxRefreshLatency = 10000,
1235 .sendInitialValue =
true},
1236 MetricSpec{.name =
"ccdb-cache-fetched-bytes",
1239 .
kind = Kind::UInt64,
1240 .scope = Scope::DPL,
1241 .minPublishInterval = 1000,
1242 .maxRefreshLatency = 10000,
1243 .sendInitialValue =
true},
1244 MetricSpec{.name =
"ccdb-cache-requested-bytes",
1247 .
kind = Kind::UInt64,
1248 .scope = Scope::DPL,
1249 .minPublishInterval = 1000,
1250 .maxRefreshLatency = 10000,
1251 .sendInitialValue =
true}};
1253 for (
auto& metric :
metrics) {
1255 if (spec.name.compare(
"readout-proxy") == 0) {
1256 stats->hasAvailSHMMetric =
true;
1261 stats->registerMetric(metric);
1264 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1272 sendRelayerMetrics(context.
services(), *stats);
1273 flushMetrics(context.
services(), *stats); },
1276 sendRelayerMetrics(context.
services(), *stats);
1277 flushMetrics(context.
services(), *stats); },
1280 sendRelayerMetrics(context.
services(), *stats);
1281 flushMetrics(context.
services(), *stats); },
1284 flushMetrics(
ref, *stats); },
1293 .
name =
"data-processing-states",
1296 clock_gettime(CLOCK_REALTIME, &now);
1297 uv_update_time(
state.loop);
1298 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
1307 states->processCommandQueue(); },
1326 .
name =
"gui-metrics",
1331 monitoring.send({(
int)spec.inputChannels.size(), fmt::format(
"oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1332 monitoring.send({(
int)1, fmt::format(
"oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1333 monitoring.send({(
int)spec.outputChannels.size(), fmt::format(
"oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1334 monitoring.send({(
int)1, fmt::format(
"oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1335 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1342 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1343 for (
size_t ci; ci < spec.outputChannels.size(); ++ci) {
1344 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format(
"oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1348 monitoring.send({(uint64_t)timeslice, fmt::format(
"oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1356 .
name =
"object-cache",
1359 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1368 .
name =
"data-processing-context",
1380 .
name =
"data-allocator",
1381 .uniqueId = simpleServiceId<DataAllocator>(),
1384 .
hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1387 .name =
"data-allocator",
1397 std::vector<ServiceSpec> specs{
1427 std::string loadableServicesStr = extraPlugins;
1430 if (loadableServicesStr.empty() ==
false) {
1431 loadableServicesStr +=
",";
1433 loadableServicesStr +=
"O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1436 std::vector<LoadablePlugin> loadablePlugins = {};
1437 char* loadableServicesEnv = getenv(
"DPL_LOAD_SERVICES");
1441 if (loadableServicesEnv) {
1442 if (loadableServicesStr.empty() ==
false) {
1443 loadableServicesStr +=
",";
1445 loadableServicesStr += loadableServicesEnv;
1448 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
std::vector< std::string > labels
std::vector< OutputRoute > routes
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
Defining ITS Vertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
const DataProcessorLabel suppressDomainInfoLabel
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
auto decongestionCallback
@ NoTransition
No pending transitions.
@ CCDB_CACHE_FETCHED_BYTES
@ AVAILABLE_MANAGED_SHM_BASE
@ SHM_OFFER_BYTES_CONSUMED
@ TIMESLICE_NUMBER_STARTED
@ CCDB_CACHE_REQUESTED_BYTES
@ TIMESLICE_NUMBER_EXPIRED
@ DROPPED_INCOMING_MESSAGES
@ ARROW_MESSAGES_DESTROYED
@ TIMESLICE_OFFER_NUMBER_CONSUMED
auto decongestionCallbackPastOldest
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
static ServiceSpec fairMQDeviceProxy()
static ServiceSpec fairMQBackendSpec()
static ServiceSpec stringBackendSpec()
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec objectCache()
static ServiceSpec controlSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
int64_t minOnlinePublishInterval
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
header::DataOrigin origin
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"