53#include "../src/DataProcessingStatus.h"
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/MonitoringFactory.h>
65#include <fairmq/Device.h>
66#include <fairmq/shmem/Monitor.h>
67#include <fairmq/shmem/Common.h>
68#include <fairmq/ProgOptions.h>
74using o2::configuration::ConfigurationFactory;
75using o2::configuration::ConfigurationInterface;
76using o2::monitoring::Monitoring;
77using o2::monitoring::MonitoringFactory;
78using Metric = o2::monitoring::Metric;
79using Key = o2::monitoring::tags::Key;
80using Value = o2::monitoring::tags::Value;
90#define MONITORING_QUEUE_SIZE 100
96 void* service =
nullptr;
97 bool isWebsocket = strncmp(options.GetPropertyAsString(
"driver-client-backend").c_str(),
"ws://", 4) == 0;
98 bool isDefault = options.GetPropertyAsString(
"monitoring-backend") ==
"default";
99 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString(
"monitoring-backend") ==
"dpl://";
100 o2::monitoring::Monitoring* monitoring;
103 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
105 monitoring->addBackend(std::move(dplBackend));
107 auto backend = isDefault ?
"infologger://" : options.GetPropertyAsString(
"monitoring-backend");
108 monitoring = MonitoringFactory::Get(backend).release();
110 service = monitoring;
114 monitoring->addGlobalTag(
"dataprocessor_name", registry.
get<
DeviceSpec const>().
name);
115 monitoring->addGlobalTag(
"dpl_instance", options.GetPropertyAsString(
"shm-segment-id"));
116 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
120 auto* monitoring = (o2::monitoring::Monitoring*)service;
122 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
123 if (extRunNumber ==
"unspecified") {
127 monitoring->setRunNumber(std::stoul(extRunNumber));
131 auto* monitoring =
reinterpret_cast<Monitoring*
>(service);
132 monitoring->flushBuffer();
133 delete monitoring; },
141 .
name =
"async-queue",
142 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
157 .
name =
"timing-info",
158 .uniqueId = simpleServiceId<TimingInfo>(),
159 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
167 .
name =
"stream-context",
168 .uniqueId = simpleServiceId<StreamContext>(),
169 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
181 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false);
182 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false); },
191 bool userDidCreate =
false;
193 for (
size_t ri = 0; ri <
routes.size(); ++ri) {
194 if (
stream->routeCreated[ri] ==
true &&
stream->routeDPLCreated[ri] ==
false) {
195 userDidCreate =
true;
199 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"userDidCreate == %d && didDispatch == %d",
201 messageContext.didDispatch());
202 if (userDidCreate ==
false && messageContext.didDispatch() ==
true) {
203 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
205 messageContext.didDispatch());
208 if (userDidCreate ==
false && messageContext.didDispatch() ==
false) {
213 for (
size_t ri = 0; ri <
routes.size(); ++ri) {
215 auto &matcher = route.matcher;
216 if (
stream->routeDPLCreated[ri] ==
true) {
217 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created by DPL. ri = %" PRIu64
", %{public}s",
221 if (
stream->routeCreated[ri] ==
true) {
223 }
if ((timeslice % route.maxTimeslices) != route.timeslice) {
224 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Route ri = %" PRIu64
", skipped because of pipelining.",
228 if (matcher.lifetime == Lifetime::Timeframe) {
230 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64
" and might result in dropped timeframes",
232 LOGP(error,
"Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes",
DataSpecUtils::describe(matcher), timeslice);
249 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false);
250 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false); },
257 .
name =
"datataking-contex",
258 .uniqueId = simpleServiceId<DataTakingContext>(),
259 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
263 for (
auto const&
ref : processingContext.
inputs()) {
265 const auto* dh = o2::header::get<o2::header::DataHeader*>(
ref.header);
269 context.runNumber = fmt::format(
"{}", dh->runNumber);
278 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
279 if (extRunNumber !=
"unspecified" || context.runNumber ==
"0") {
280 context.runNumber = extRunNumber;
282 auto extLHCPeriod = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"lhc_period",
"unspecified");
283 if (extLHCPeriod !=
"unspecified") {
284 context.lhcPeriod = extLHCPeriod;
286 static const char* months[12] = {
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC"};
287 time_t now =
time(
nullptr);
288 auto ltm = gmtime(&now);
289 context.lhcPeriod = months[ltm->tm_mon];
290 LOG(info) <<
"LHCPeriod is not available, using current month " << context.lhcPeriod;
293 auto extRunType = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"run_type",
"unspecified");
294 if (extRunType !=
"unspecified") {
295 context.runType = extRunType;
297 auto extEnvId = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"environment_id",
"unspecified");
298 if (extEnvId !=
"unspecified") {
299 context.envId = extEnvId;
301 auto extDetectors = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"detectors",
"unspecified");
302 if (extDetectors !=
"unspecified") {
303 context.detectors = extDetectors;
305 auto forcedRaw = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"force_run_as_raw",
"false");
306 context.forcedRaw = forcedRaw ==
"true"; },
316 .
name =
"configuration",
318 auto backend = options.GetPropertyAsString(
"configuration");
319 if (backend ==
"command-line") {
322 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
323 ConfigurationFactory::getConfiguration(backend).release()};
327 if (dc.options.count(
"configuration") == 0) {
331 auto backend = dc.options[
"configuration"].as<std::string>();
333 ConfigurationFactory::getConfiguration(backend).release()}); },
340 .
name =
"driverClient",
342 auto backend = options.GetPropertyAsString(
"driver-client-backend");
343 if (backend ==
"stdout://") {
344 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
348 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
360 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
370 .
name =
"localrootfile",
371 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
382 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
392 .
name =
"timesliceindex",
395 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
406 .init = simpleServiceInit<CallbackService, CallbackService>(),
414 .
name =
"datarelayer",
432 .
name =
"datasender",
481 .
name =
"ccdb-support",
485 for (
auto&
output : spec.outputs) {
487 LOGP(
debug,
"Optional inputs support enabled");
499 LOGP(
debug,
"We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
515 if (concrete.subSpec == 0) {
519 stfDist.
id = timingInfo.timeslice;
520 stfDist.firstOrbit = timingInfo.firstTForbit;
521 stfDist.runNumber = timingInfo.runNumber;
525 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"finaliseOutputs",
"Route %" PRIu64
" (%{public}s) was created by DPL.", (uint64_t)oi,
527 streamContext.routeDPLCreated[oi] =
true;
546 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Not sending already sent value: %" PRIu64
"> %" PRIu64,
548 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
551 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Running oldest possible timeslice %" PRIu64
" propagation.",
552 (uint64_t)oldestPossibleOutput.timeslice.value);
555 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
556 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
559 if (info.channelType != ChannelAccountingType::DPL) {
560 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
565 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
566 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
569 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
580 int64_t oldNextTimeslice = decongestion.nextTimeslice;
581 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
582 if (oldNextTimeslice != decongestion.nextTimeslice) {
584 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
586 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
588 timesliceIndex.rescan();
600 .
name =
"decongestion",
603 for (
auto& input : services.
get<
DeviceSpec const>().inputs) {
604 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
605 LOGP(detail,
"Found a real data input, we cannot update the oldest possible timeslice when sending messages");
606 decongestion->isFirstInTopology =
false;
612 decongestion->suppressDomainInfo =
true;
617 decongestion->oldestPossibleTimesliceTask =
AsyncQueueHelpers::create(queue, {.name =
"oldest-possible-timeslice", .score = 100});
626 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"postForwardingCallbacks",
"We are the first one in the topology, we need to update the oldest possible timeslice");
629 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
632 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
633 LOGP(detail,
"Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
637 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
639 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
640 (uint64_t)oldestPossibleOutput.timeslice.value);
643 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
644 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
645 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
649 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Broadcasting oldest posssible output %" PRIu64
" due to %{public}s (%" PRIu64
")",
650 (uint64_t)oldestPossibleOutput.timeslice.value,
651 oldestPossibleOutput.slot.index == -1 ?
"channel" :
"slot",
652 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
653 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Ordered active %d", decongestion->orderedCompletionPolicyActive);
654 if (decongestion->orderedCompletionPolicyActive) {
655 auto oldNextTimeslice = decongestion->nextTimeslice;
656 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
657 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Next timeslice %" PRIi64, decongestion->nextTimeslice);
658 if (oldNextTimeslice != decongestion->nextTimeslice) {
661 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
663 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
670 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
671 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
675 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
680 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
681 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
684 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
688 decongestion->nextEnumerationTimeslice = 0;
689 decongestion->nextEnumerationTimesliceRewinded =
false;
690 decongestion->lastTimeslice = 0;
691 decongestion->nextTimeslice = 0;
692 decongestion->oldestPossibleTimesliceTask = {0};
694 for (
auto &channel :
state.inputChannelInfos) {
695 channel.oldestForChannel = {0};
702 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Received oldest possible timeframe %" PRIu64
" from channel %d",
703 (uint64_t)oldestPossibleTimeslice, channel.value);
704 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
705 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
706 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
708 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
709 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
712 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
713 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
714 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
721 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Queueing oldest possible timeslice %" PRIu64
" propagation for execution.",
722 (uint64_t)oldestPossibleOutput.timeslice.value);
725 .id = decongestion.oldestPossibleTimesliceTask,
729 if (decongestion.orderedCompletionPolicyActive) {
733 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
745 .
name =
"threadpool",
750 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
752 .configure = [](
InitContext&,
void* service) ->
void* {
753 auto* t =
reinterpret_cast<ThreadPool*
>(service);
761 setenv(
"UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
774 using namespace fair::mq::shmem;
775 auto& spec = registry.get<
DeviceSpec const>();
778 if (stats.hasAvailSHMMetric) {
780 long freeMemory = -1;
782 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
785 if (freeMemory == -1) {
787 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
794 auto device = registry.get<RawDeviceService>().device();
799 for (
auto& channel : device->GetChannels()) {
800 totalBytesIn += channel.second[0].GetBytesRx();
801 totalBytesOut += channel.second[0].GetBytesTx();
811auto flushStates(ServiceRegistryRef registry, DataProcessingStates&
states) ->
void
813 if (!registry.get<DriverConfig
const>().driverHasGUI) {
816 states.flushChangedStates([&
states, registry](std::string
const& spec,
int64_t timestamp, std::string_view
value)
mutable ->
void {
817 auto& client = registry.get<ControlService>();
818 client.push(spec,
value, timestamp);
825auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) ->
void
831 if (registry.isMainThread() ==
false) {
832 LOGP(fatal,
"Flushing metrics should only happen on the main thread.");
834 auto& monitoring = registry.get<
Monitoring>();
835 auto& relayer = registry.get<DataRelayer>();
838 stats.flushChangedMetrics([&monitoring, sid](DataProcessingStats::MetricSpec
const& spec,
int64_t timestamp,
int64_t value)
mutable ->
void {
840 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
841 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
844 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is negative, setting to 0",
848 metric.addValue((uint64_t)
value,
"value");
851 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is too large, setting to INT_MAX",
856 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is too small, setting to INT_MIN",
860 metric.addValue((
int)
value,
"value");
863 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
865 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Flushing metric %{public}s", spec.name.c_str());
866 monitoring.send(std::move(metric));
868 relayer.sendContextState();
869 monitoring.flushBuffer();
870 O2_SIGNPOST_END(monitoring_service, sid,
"flush",
"done flushing metrics");
877 .
name =
"data-processing-stats",
880 clock_gettime(CLOCK_REALTIME, &now);
881 uv_update_time(
state.loop);
882 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
884 .
minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>(
"dpl-stats-min-online-publishing-interval").c_str()) * 1000};
891 int quickUpdateInterval = 5000;
892 uint64_t quickRefreshInterval = 7000;
893 uint64_t onlineRefreshLatency = 60000;
899 bool enableDebugMetrics =
false;
901 bool enableDebugMetrics =
true;
903 bool arrowAndResourceLimitingMetrics =
false;
905 arrowAndResourceLimitingMetrics =
true;
908 int64_t consumedTimeframesPublishInterval = 0;
910 consumedTimeframesPublishInterval = 5000;
915 bool enableCPUUsageFraction =
true;
917 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
919 O2_SIGNPOST_EVENT_EMIT(policies, mid,
"metrics",
"Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
920 enableCPUUsageFraction =
false;
923 std::vector<DataProcessingStats::MetricSpec>
metrics = {
924 MetricSpec{.name =
"errors",
926 .
kind = Kind::UInt64,
927 .scope = Scope::Online,
928 .minPublishInterval = quickUpdateInterval,
929 .maxRefreshLatency = quickRefreshInterval},
930 MetricSpec{.name =
"exceptions",
932 .
kind = Kind::UInt64,
933 .scope = Scope::Online,
934 .minPublishInterval = quickUpdateInterval},
935 MetricSpec{.name =
"inputs/relayed/pending",
937 .
kind = Kind::UInt64,
938 .minPublishInterval = quickUpdateInterval},
939 MetricSpec{.name =
"inputs/relayed/incomplete",
941 .
kind = Kind::UInt64,
942 .minPublishInterval = quickUpdateInterval},
943 MetricSpec{.name =
"inputs/relayed/total",
945 .
kind = Kind::UInt64,
946 .minPublishInterval = quickUpdateInterval},
947 MetricSpec{.name =
"elapsed_time_ms",
949 .
kind = Kind::UInt64,
950 .minPublishInterval = quickUpdateInterval},
951 MetricSpec{.name =
"total_wall_time_ms",
953 .
kind = Kind::UInt64,
954 .minPublishInterval = quickUpdateInterval},
955 MetricSpec{.name =
"last_processed_input_size_byte",
957 .
kind = Kind::UInt64,
958 .minPublishInterval = quickUpdateInterval},
959 MetricSpec{.name =
"total_processed_input_size_byte",
961 .
kind = Kind::UInt64,
962 .scope = Scope::Online,
963 .minPublishInterval = quickUpdateInterval},
964 MetricSpec{.name =
"total_sigusr1",
966 .
kind = Kind::UInt64,
967 .minPublishInterval = quickUpdateInterval},
968 MetricSpec{.name =
"consumed-timeframes",
970 .
kind = Kind::UInt64,
971 .minPublishInterval = consumedTimeframesPublishInterval,
972 .maxRefreshLatency = quickRefreshInterval,
973 .sendInitialValue =
true},
974 MetricSpec{.name =
"min_input_latency_ms",
976 .
kind = Kind::UInt64,
977 .scope = Scope::Online,
978 .minPublishInterval = quickUpdateInterval},
979 MetricSpec{.name =
"max_input_latency_ms",
981 .
kind = Kind::UInt64,
982 .minPublishInterval = quickUpdateInterval},
983 MetricSpec{.name =
"total_rate_in_mb_s",
986 .scope = Scope::Online,
987 .minPublishInterval = quickUpdateInterval,
988 .maxRefreshLatency = onlineRefreshLatency,
989 .sendInitialValue =
true},
990 MetricSpec{.name =
"total_rate_out_mb_s",
993 .scope = Scope::Online,
994 .minPublishInterval = quickUpdateInterval,
995 .maxRefreshLatency = onlineRefreshLatency,
996 .sendInitialValue =
true},
997 MetricSpec{.name =
"processing_rate_hz",
1000 .scope = Scope::Online,
1001 .minPublishInterval = quickUpdateInterval,
1002 .maxRefreshLatency = onlineRefreshLatency,
1003 .sendInitialValue =
true},
1004 MetricSpec{.name =
"cpu_usage_fraction",
1005 .enabled = enableCPUUsageFraction,
1008 .scope = Scope::Online,
1009 .minPublishInterval = quickUpdateInterval,
1010 .maxRefreshLatency = onlineRefreshLatency,
1011 .sendInitialValue =
true},
1012 MetricSpec{.name =
"performed_computations",
1014 .
kind = Kind::UInt64,
1015 .scope = Scope::Online,
1016 .minPublishInterval = quickUpdateInterval,
1017 .maxRefreshLatency = onlineRefreshLatency,
1018 .sendInitialValue =
true},
1019 MetricSpec{.name =
"total_bytes_in",
1021 .
kind = Kind::UInt64,
1022 .scope = Scope::Online,
1023 .minPublishInterval = quickUpdateInterval,
1024 .maxRefreshLatency = onlineRefreshLatency,
1025 .sendInitialValue =
true},
1026 MetricSpec{.name =
"total_bytes_out",
1028 .
kind = Kind::UInt64,
1029 .scope = Scope::Online,
1030 .minPublishInterval = quickUpdateInterval,
1031 .maxRefreshLatency = onlineRefreshLatency,
1032 .sendInitialValue =
true},
1033 MetricSpec{.name = fmt::format(
"available_managed_shm_{}", runningWorkflow.shmSegmentId),
1035 .kind = Kind::UInt64,
1036 .scope = Scope::Online,
1037 .minPublishInterval = 500,
1038 .maxRefreshLatency = onlineRefreshLatency,
1039 .sendInitialValue =
true},
1044 MetricSpec{.name =
"arrow-bytes-destroyed",
1045 .enabled = arrowAndResourceLimitingMetrics,
1047 .
kind = Kind::UInt64,
1048 .scope = Scope::DPL,
1049 .minPublishInterval = 0,
1050 .maxRefreshLatency = 10000,
1051 .sendInitialValue =
true},
1052 MetricSpec{.name =
"arrow-messages-destroyed",
1053 .enabled = arrowAndResourceLimitingMetrics,
1055 .
kind = Kind::UInt64,
1056 .scope = Scope::DPL,
1057 .minPublishInterval = 0,
1058 .maxRefreshLatency = 10000,
1059 .sendInitialValue =
true},
1060 MetricSpec{.name =
"arrow-bytes-created",
1061 .enabled = arrowAndResourceLimitingMetrics,
1063 .
kind = Kind::UInt64,
1064 .scope = Scope::DPL,
1065 .minPublishInterval = 0,
1066 .maxRefreshLatency = 10000,
1067 .sendInitialValue =
true},
1068 MetricSpec{.name =
"arrow-messages-created",
1069 .enabled = arrowAndResourceLimitingMetrics,
1071 .
kind = Kind::UInt64,
1072 .scope = Scope::DPL,
1073 .minPublishInterval = 0,
1074 .maxRefreshLatency = 10000,
1075 .sendInitialValue =
true},
1076 MetricSpec{.name =
"arrow-bytes-expired",
1077 .enabled = arrowAndResourceLimitingMetrics,
1079 .
kind = Kind::UInt64,
1080 .scope = Scope::DPL,
1081 .minPublishInterval = 0,
1082 .maxRefreshLatency = 10000,
1083 .sendInitialValue =
true},
1084 MetricSpec{.name =
"shm-offer-bytes-consumed",
1085 .enabled = arrowAndResourceLimitingMetrics,
1087 .
kind = Kind::UInt64,
1088 .scope = Scope::DPL,
1089 .minPublishInterval = 0,
1090 .maxRefreshLatency = 10000,
1091 .sendInitialValue =
true},
1092 MetricSpec{.name =
"timeslice-offer-number-consumed",
1093 .enabled = arrowAndResourceLimitingMetrics,
1095 .
kind = Kind::UInt64,
1096 .scope = Scope::DPL,
1097 .minPublishInterval = 0,
1098 .maxRefreshLatency = 10000,
1099 .sendInitialValue =
true},
1100 MetricSpec{.name =
"timeslices-expired",
1101 .enabled = arrowAndResourceLimitingMetrics,
1103 .
kind = Kind::UInt64,
1104 .scope = Scope::DPL,
1105 .minPublishInterval = 0,
1106 .maxRefreshLatency = 10000,
1107 .sendInitialValue =
true},
1108 MetricSpec{.name =
"timeslices-started",
1109 .enabled = arrowAndResourceLimitingMetrics,
1111 .
kind = Kind::UInt64,
1112 .scope = Scope::DPL,
1113 .minPublishInterval = 0,
1114 .maxRefreshLatency = 10000,
1115 .sendInitialValue =
true},
1116 MetricSpec{.name =
"timeslices-done",
1117 .enabled = arrowAndResourceLimitingMetrics,
1119 .
kind = Kind::UInt64,
1120 .scope = Scope::DPL,
1121 .minPublishInterval = 0,
1122 .maxRefreshLatency = 10000,
1123 .sendInitialValue =
true},
1124 MetricSpec{.name =
"resources-missing",
1125 .enabled = enableDebugMetrics,
1127 .
kind = Kind::UInt64,
1128 .scope = Scope::DPL,
1129 .minPublishInterval = 1000,
1130 .maxRefreshLatency = 1000,
1131 .sendInitialValue =
true},
1132 MetricSpec{.name =
"resources-insufficient",
1133 .enabled = enableDebugMetrics,
1135 .
kind = Kind::UInt64,
1136 .scope = Scope::DPL,
1137 .minPublishInterval = 1000,
1138 .maxRefreshLatency = 1000,
1139 .sendInitialValue =
true},
1140 MetricSpec{.name =
"resources-satisfactory",
1141 .enabled = enableDebugMetrics,
1143 .
kind = Kind::UInt64,
1144 .scope = Scope::DPL,
1145 .minPublishInterval = 1000,
1146 .maxRefreshLatency = 1000,
1147 .sendInitialValue =
true},
1148 MetricSpec{.name =
"resource-offer-expired",
1149 .enabled = arrowAndResourceLimitingMetrics,
1151 .
kind = Kind::UInt64,
1152 .scope = Scope::DPL,
1153 .minPublishInterval = 0,
1154 .maxRefreshLatency = 10000,
1155 .sendInitialValue =
true},
1156 MetricSpec{.name =
"ccdb-cache-hit",
1159 .
kind = Kind::UInt64,
1160 .scope = Scope::DPL,
1161 .minPublishInterval = 1000,
1162 .maxRefreshLatency = 10000,
1163 .sendInitialValue =
true},
1164 MetricSpec{.name =
"ccdb-cache-miss",
1167 .
kind = Kind::UInt64,
1168 .scope = Scope::DPL,
1169 .minPublishInterval = 1000,
1170 .maxRefreshLatency = 10000,
1171 .sendInitialValue =
true},
1172 MetricSpec{.name =
"ccdb-cache-failure",
1175 .
kind = Kind::UInt64,
1176 .scope = Scope::DPL,
1177 .minPublishInterval = 1000,
1178 .maxRefreshLatency = 10000,
1179 .sendInitialValue =
true}};
1181 for (
auto& metric :
metrics) {
1183 if (spec.name.compare(
"readout-proxy") == 0) {
1184 stats->hasAvailSHMMetric =
true;
1189 stats->registerMetric(metric);
1192 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1200 sendRelayerMetrics(context.
services(), *stats);
1201 flushMetrics(context.
services(), *stats); },
1204 sendRelayerMetrics(context.
services(), *stats);
1205 flushMetrics(context.
services(), *stats); },
1208 sendRelayerMetrics(context.
services(), *stats);
1209 flushMetrics(context.
services(), *stats); },
1212 flushMetrics(
ref, *stats); },
1221 .
name =
"data-processing-states",
1224 clock_gettime(CLOCK_REALTIME, &now);
1225 uv_update_time(
state.loop);
1226 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
1235 states->processCommandQueue(); },
1254 .
name =
"gui-metrics",
1259 monitoring.send({(
int)spec.inputChannels.size(), fmt::format(
"oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1260 monitoring.send({(
int)1, fmt::format(
"oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1261 monitoring.send({(
int)spec.outputChannels.size(), fmt::format(
"oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1262 monitoring.send({(
int)1, fmt::format(
"oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1263 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1270 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1271 for (
size_t ci; ci < spec.outputChannels.size(); ++ci) {
1272 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format(
"oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1276 monitoring.send({(uint64_t)timeslice, fmt::format(
"oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1284 .
name =
"object-cache",
1287 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1296 .
name =
"data-processing-context",
1308 .
name =
"data-allocator",
1309 .uniqueId = simpleServiceId<DataAllocator>(),
1312 .
hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1315 .name =
"data-allocator",
1325 std::vector<ServiceSpec> specs{
1355 std::string loadableServicesStr = extraPlugins;
1358 if (loadableServicesStr.empty() ==
false) {
1359 loadableServicesStr +=
",";
1361 loadableServicesStr +=
"O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1364 std::vector<LoadablePlugin> loadablePlugins = {};
1365 char* loadableServicesEnv = getenv(
"DPL_LOAD_SERVICES");
1369 if (loadableServicesEnv) {
1370 if (loadableServicesStr.empty() ==
false) {
1371 loadableServicesStr +=
",";
1373 loadableServicesStr += loadableServicesEnv;
1376 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
std::vector< std::string > labels
std::vector< OutputRoute > routes
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
const DataProcessorLabel suppressDomainInfoLabel
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
auto decongestionCallback
@ NoTransition
No pending transitions.
@ AVAILABLE_MANAGED_SHM_BASE
@ SHM_OFFER_BYTES_CONSUMED
@ TIMESLICE_NUMBER_STARTED
@ TIMESLICE_NUMBER_EXPIRED
@ DROPPED_INCOMING_MESSAGES
@ ARROW_MESSAGES_DESTROYED
@ TIMESLICE_OFFER_NUMBER_CONSUMED
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
static ServiceSpec fairMQDeviceProxy()
static ServiceSpec fairMQBackendSpec()
static ServiceSpec stringBackendSpec()
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec objectCache()
static ServiceSpec controlSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
int64_t minOnlinePublishInterval
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
header::DataOrigin origin
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"