51#include "../src/DataProcessingStatus.h"
58#include <Configuration/ConfigurationInterface.h>
59#include <Configuration/ConfigurationFactory.h>
60#include <Monitoring/MonitoringFactory.h>
63#include <fairmq/Device.h>
64#include <fairmq/shmem/Monitor.h>
65#include <fairmq/shmem/Common.h>
66#include <fairmq/ProgOptions.h>
72using o2::configuration::ConfigurationFactory;
73using o2::configuration::ConfigurationInterface;
74using o2::monitoring::Monitoring;
75using o2::monitoring::MonitoringFactory;
76using Metric = o2::monitoring::Metric;
77using Key = o2::monitoring::tags::Key;
78using Value = o2::monitoring::tags::Value;
88#define MONITORING_QUEUE_SIZE 100
94 void* service =
nullptr;
95 bool isWebsocket = strncmp(options.GetPropertyAsString(
"driver-client-backend").c_str(),
"ws://", 4) == 0;
96 bool isDefault = options.GetPropertyAsString(
"monitoring-backend") ==
"default";
97 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString(
"monitoring-backend") ==
"dpl://";
98 o2::monitoring::Monitoring* monitoring;
101 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
103 monitoring->addBackend(std::move(dplBackend));
105 auto backend = isDefault ?
"infologger://" : options.GetPropertyAsString(
"monitoring-backend");
106 monitoring = MonitoringFactory::Get(backend).release();
108 service = monitoring;
112 monitoring->addGlobalTag(
"dataprocessor_name", registry.
get<
DeviceSpec const>().
name);
113 monitoring->addGlobalTag(
"dpl_instance", options.GetPropertyAsString(
"shm-segment-id"));
114 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
118 auto* monitoring = (o2::monitoring::Monitoring*)service;
120 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
121 if (extRunNumber ==
"unspecified") {
125 monitoring->setRunNumber(std::stoul(extRunNumber));
129 auto* monitoring =
reinterpret_cast<Monitoring*
>(service);
130 monitoring->flushBuffer();
131 delete monitoring; },
139 .
name =
"async-queue",
140 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
155 .
name =
"timing-info",
156 .uniqueId = simpleServiceId<TimingInfo>(),
157 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
165 .
name =
"stream-context",
166 .uniqueId = simpleServiceId<StreamContext>(),
167 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
176 stream->routeDPLCreated.resize(routes.size());
177 stream->routeCreated.resize(routes.size());
179 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false);
180 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false); },
189 bool userDidCreate =
false;
191 for (
size_t ri = 0; ri < routes.size(); ++ri) {
192 if (
stream->routeCreated[ri] ==
true &&
stream->routeDPLCreated[ri] ==
false) {
193 userDidCreate =
true;
197 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"userDidCreate == %d && didDispatch == %d",
199 messageContext.didDispatch());
200 if (userDidCreate ==
false && messageContext.didDispatch() ==
true) {
201 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
203 messageContext.didDispatch());
206 if (userDidCreate ==
false && messageContext.didDispatch() ==
false) {
211 for (
size_t ri = 0; ri < routes.size(); ++ri) {
212 auto &route = routes[ri];
213 auto &matcher = route.matcher;
214 if (
stream->routeDPLCreated[ri] ==
true) {
215 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created by DPL. ri = %" PRIu64
", %{public}s",
219 if (
stream->routeCreated[ri] ==
true) {
221 }
if ((timeslice % route.maxTimeslices) != route.timeslice) {
222 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Route ri = %" PRIu64
", skipped because of pipelining.",
226 if (matcher.lifetime == Lifetime::Timeframe) {
228 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64
" and might result in dropped timeframes",
230 LOGP(error,
"Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes",
DataSpecUtils::describe(matcher), timeslice);
244 stream->routeDPLCreated.resize(routes.size());
245 stream->routeCreated.resize(routes.size());
247 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false);
248 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false); },
255 .
name =
"datataking-contex",
256 .uniqueId = simpleServiceId<DataTakingContext>(),
257 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
261 for (
auto const&
ref : processingContext.
inputs()) {
263 const auto* dh = o2::header::get<o2::header::DataHeader*>(
ref.header);
267 context.runNumber = fmt::format(
"{}", dh->runNumber);
276 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
277 if (extRunNumber !=
"unspecified" || context.runNumber ==
"0") {
278 context.runNumber = extRunNumber;
280 auto extLHCPeriod = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"lhc_period",
"unspecified");
281 if (extLHCPeriod !=
"unspecified") {
282 context.lhcPeriod = extLHCPeriod;
284 static const char* months[12] = {
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC"};
285 time_t now =
time(
nullptr);
286 auto ltm = gmtime(&now);
287 context.lhcPeriod = months[ltm->tm_mon];
288 LOG(info) <<
"LHCPeriod is not available, using current month " << context.lhcPeriod;
291 auto extRunType = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"run_type",
"unspecified");
292 if (extRunType !=
"unspecified") {
293 context.runType = extRunType;
295 auto extEnvId = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"environment_id",
"unspecified");
296 if (extEnvId !=
"unspecified") {
297 context.envId = extEnvId;
299 auto extDetectors = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"detectors",
"unspecified");
300 if (extDetectors !=
"unspecified") {
301 context.detectors = extDetectors;
303 auto forcedRaw = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"force_run_as_raw",
"false");
304 context.forcedRaw = forcedRaw ==
"true"; },
314 .
name =
"configuration",
316 auto backend = options.GetPropertyAsString(
"configuration");
317 if (backend ==
"command-line") {
320 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
321 ConfigurationFactory::getConfiguration(backend).release()};
325 if (dc.options.count(
"configuration") == 0) {
329 auto backend = dc.options[
"configuration"].as<std::string>();
331 ConfigurationFactory::getConfiguration(backend).release()}); },
338 .
name =
"driverClient",
340 auto backend = options.GetPropertyAsString(
"driver-client-backend");
341 if (backend ==
"stdout://") {
342 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
346 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
358 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
368 .
name =
"localrootfile",
369 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
380 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
390 .
name =
"timesliceindex",
393 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
404 .init = simpleServiceInit<CallbackService, CallbackService>(),
412 .
name =
"datarelayer",
428 .
name =
"datasender",
477 .
name =
"ccdb-support",
481 for (
auto&
output : spec.outputs) {
483 LOGP(
debug,
"Optional inputs support enabled");
495 LOGP(
debug,
"We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
511 if (concrete.subSpec == 0) {
515 stfDist.
id = timingInfo.timeslice;
516 stfDist.firstOrbit = timingInfo.firstTForbit;
517 stfDist.runNumber = timingInfo.runNumber;
521 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"finaliseOutputs",
"Route %" PRIu64
" (%{public}s) was created by DPL.", (uint64_t)oi,
523 streamContext.routeDPLCreated[oi] =
true;
542 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
543 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Not sending already sent value: %" PRIu64
"> %" PRIu64,
544 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
547 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Running oldest possible timeslice %" PRIu64
" propagation.",
548 (uint64_t)oldestPossibleOutput.timeslice.value);
551 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
552 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
555 if (info.channelType != ChannelAccountingType::DPL) {
556 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
561 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
562 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
565 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
576 int64_t oldNextTimeslice = decongestion.nextTimeslice;
577 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
578 if (oldNextTimeslice != decongestion.nextTimeslice) {
580 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
582 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
584 timesliceIndex.rescan();
596 .
name =
"decongestion",
599 for (
auto& input : services.
get<
DeviceSpec const>().inputs) {
600 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
601 LOGP(detail,
"Found a real data input, we cannot update the oldest possible timeslice when sending messages");
602 decongestion->isFirstInTopology =
false;
607 decongestion->oldestPossibleTimesliceTask =
AsyncQueueHelpers::create(queue, {.name =
"oldest-possible-timeslice", .score = 100});
616 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"postForwardingCallbacks",
"We are the first one in the topology, we need to update the oldest possible timeslice");
619 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
622 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
623 LOGP(detail,
"Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
627 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
629 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
630 (uint64_t)oldestPossibleOutput.timeslice.value);
633 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
634 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
635 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
639 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Broadcasting oldest posssible output %" PRIu64
" due to %{public}s (%" PRIu64
")",
640 (uint64_t)oldestPossibleOutput.timeslice.value,
641 oldestPossibleOutput.slot.index == -1 ?
"channel" :
"slot",
642 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
643 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Ordered active %d", decongestion->orderedCompletionPolicyActive);
644 if (decongestion->orderedCompletionPolicyActive) {
645 auto oldNextTimeslice = decongestion->nextTimeslice;
646 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
647 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Next timeslice %" PRIi64, decongestion->nextTimeslice);
648 if (oldNextTimeslice != decongestion->nextTimeslice) {
651 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
653 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
660 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
661 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
665 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
670 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
671 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
674 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
678 decongestion->nextEnumerationTimeslice = 0;
679 decongestion->nextEnumerationTimesliceRewinded =
false;
680 decongestion->lastTimeslice = 0;
681 decongestion->nextTimeslice = 0;
682 decongestion->oldestPossibleTimesliceTask = {0};
684 for (
auto &channel :
state.inputChannelInfos) {
685 channel.oldestForChannel = {0};
692 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Received oldest possible timeframe %" PRIu64
" from channel %d",
693 (uint64_t)oldestPossibleTimeslice, channel.value);
694 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
695 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
696 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
698 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
699 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
702 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
703 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
704 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
711 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Queueing oldest possible timeslice %" PRIu64
" propagation for execution.",
712 (uint64_t)oldestPossibleOutput.timeslice.value);
715 .id = decongestion.oldestPossibleTimesliceTask,
719 if (decongestion.orderedCompletionPolicyActive) {
723 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
735 .
name =
"threadpool",
740 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
742 .configure = [](
InitContext&,
void* service) ->
void* {
743 auto* t =
reinterpret_cast<ThreadPool*
>(service);
751 setenv(
"UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
764 using namespace fair::mq::shmem;
765 auto& spec = registry.get<
DeviceSpec const>();
768 if (stats.hasAvailSHMMetric) {
770 long freeMemory = -1;
772 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
775 if (freeMemory == -1) {
777 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
784 auto device = registry.get<RawDeviceService>().device();
786 int64_t totalBytesIn = 0;
787 int64_t totalBytesOut = 0;
789 for (
auto& channel : device->GetChannels()) {
790 totalBytesIn += channel.second[0].GetBytesRx();
791 totalBytesOut += channel.second[0].GetBytesTx();
801auto flushStates(ServiceRegistryRef registry, DataProcessingStates&
states) ->
void
803 states.flushChangedStates([&
states, registry](std::string
const& spec, int64_t timestamp, std::string_view
value)
mutable ->
void {
804 auto& client = registry.get<ControlService>();
805 client.push(spec,
value, timestamp);
812auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) ->
void
818 if (registry.isMainThread() ==
false) {
819 LOGP(fatal,
"Flushing metrics should only happen on the main thread.");
821 auto& monitoring = registry.get<
Monitoring>();
822 auto& relayer = registry.get<DataRelayer>();
825 stats.flushChangedMetrics([&monitoring](DataProcessingStats::MetricSpec
const& spec, int64_t timestamp, int64_t
value)
mutable ->
void {
827 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
828 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
831 LOG(
debug) <<
"Value for " << spec.name <<
" is negative, setting to 0";
834 metric.addValue((uint64_t)
value,
"value");
836 if (
value > (int64_t)std::numeric_limits<int>::max()) {
837 LOG(warning) <<
"Value for " << spec.name <<
" is too large, setting to INT_MAX";
838 value = (int64_t)std::numeric_limits<int>::max();
840 if (
value < (int64_t)std::numeric_limits<int>::min()) {
841 value = (int64_t)std::numeric_limits<int>::min();
842 LOG(warning) <<
"Value for " << spec.name <<
" is too small, setting to INT_MIN";
844 metric.addValue((
int)
value,
"value");
847 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
849 monitoring.send(std::move(metric));
851 relayer.sendContextState();
852 monitoring.flushBuffer();
853 O2_SIGNPOST_END(monitoring_service, sid,
"flush",
"done flushing metrics");
860 .
name =
"data-processing-stats",
863 clock_gettime(CLOCK_REALTIME, &now);
864 uv_update_time(
state.loop);
865 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
867 .
minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>(
"dpl-stats-min-online-publishing-interval").c_str()) * 1000};
874 int quickUpdateInterval = 5000;
875 uint64_t quickRefreshInterval = 7000;
876 uint64_t onlineRefreshLatency = 60000;
882 bool enableDebugMetrics =
false;
884 bool enableDebugMetrics =
true;
886 bool arrowAndResourceLimitingMetrics =
false;
888 arrowAndResourceLimitingMetrics =
true;
893 bool enableCPUUsageFraction =
true;
895 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
897 O2_SIGNPOST_EVENT_EMIT(policies, mid,
"metrics",
"Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
898 enableCPUUsageFraction =
false;
901 std::vector<DataProcessingStats::MetricSpec>
metrics = {
902 MetricSpec{.name =
"errors",
904 .kind = Kind::UInt64,
905 .scope = Scope::Online,
906 .minPublishInterval = quickUpdateInterval,
907 .maxRefreshLatency = quickRefreshInterval},
908 MetricSpec{.name =
"exceptions",
910 .kind = Kind::UInt64,
911 .scope = Scope::Online,
912 .minPublishInterval = quickUpdateInterval},
913 MetricSpec{.name =
"inputs/relayed/pending",
915 .kind = Kind::UInt64,
916 .minPublishInterval = quickUpdateInterval},
917 MetricSpec{.name =
"inputs/relayed/incomplete",
919 .kind = Kind::UInt64,
920 .minPublishInterval = quickUpdateInterval},
921 MetricSpec{.name =
"inputs/relayed/total",
923 .kind = Kind::UInt64,
924 .minPublishInterval = quickUpdateInterval},
925 MetricSpec{.name =
"elapsed_time_ms",
927 .kind = Kind::UInt64,
928 .minPublishInterval = quickUpdateInterval},
929 MetricSpec{.name =
"total_wall_time_ms",
931 .kind = Kind::UInt64,
932 .minPublishInterval = quickUpdateInterval},
933 MetricSpec{.name =
"last_processed_input_size_byte",
935 .kind = Kind::UInt64,
936 .minPublishInterval = quickUpdateInterval},
937 MetricSpec{.name =
"total_processed_input_size_byte",
939 .kind = Kind::UInt64,
940 .scope = Scope::Online,
941 .minPublishInterval = quickUpdateInterval},
942 MetricSpec{.name =
"total_sigusr1",
944 .kind = Kind::UInt64,
945 .minPublishInterval = quickUpdateInterval},
946 MetricSpec{.name =
"consumed-timeframes",
948 .kind = Kind::UInt64,
949 .minPublishInterval = 0,
950 .maxRefreshLatency = quickRefreshInterval,
951 .sendInitialValue =
true},
952 MetricSpec{.name =
"min_input_latency_ms",
954 .kind = Kind::UInt64,
955 .scope = Scope::Online,
956 .minPublishInterval = quickUpdateInterval},
957 MetricSpec{.name =
"max_input_latency_ms",
959 .kind = Kind::UInt64,
960 .minPublishInterval = quickUpdateInterval},
961 MetricSpec{.name =
"total_rate_in_mb_s",
964 .scope = Scope::Online,
965 .minPublishInterval = quickUpdateInterval,
966 .maxRefreshLatency = onlineRefreshLatency,
967 .sendInitialValue =
true},
968 MetricSpec{.name =
"total_rate_out_mb_s",
971 .scope = Scope::Online,
972 .minPublishInterval = quickUpdateInterval,
973 .maxRefreshLatency = onlineRefreshLatency,
974 .sendInitialValue =
true},
975 MetricSpec{.name =
"processing_rate_hz",
978 .scope = Scope::Online,
979 .minPublishInterval = quickUpdateInterval,
980 .maxRefreshLatency = onlineRefreshLatency,
981 .sendInitialValue =
true},
982 MetricSpec{.name =
"cpu_usage_fraction",
983 .enabled = enableCPUUsageFraction,
986 .scope = Scope::Online,
987 .minPublishInterval = quickUpdateInterval,
988 .maxRefreshLatency = onlineRefreshLatency,
989 .sendInitialValue =
true},
990 MetricSpec{.name =
"performed_computations",
992 .kind = Kind::UInt64,
993 .scope = Scope::Online,
994 .minPublishInterval = quickUpdateInterval,
995 .maxRefreshLatency = onlineRefreshLatency,
996 .sendInitialValue =
true},
997 MetricSpec{.name =
"total_bytes_in",
999 .kind = Kind::UInt64,
1000 .scope = Scope::Online,
1001 .minPublishInterval = quickUpdateInterval,
1002 .maxRefreshLatency = onlineRefreshLatency,
1003 .sendInitialValue =
true},
1004 MetricSpec{.name =
"total_bytes_out",
1006 .kind = Kind::UInt64,
1007 .scope = Scope::Online,
1008 .minPublishInterval = quickUpdateInterval,
1009 .maxRefreshLatency = onlineRefreshLatency,
1010 .sendInitialValue =
true},
1011 MetricSpec{.name = fmt::format(
"available_managed_shm_{}", runningWorkflow.shmSegmentId),
1013 .kind = Kind::UInt64,
1014 .scope = Scope::Online,
1015 .minPublishInterval = 500,
1016 .maxRefreshLatency = onlineRefreshLatency,
1017 .sendInitialValue =
true},
1018 MetricSpec{.name =
"malformed_inputs", .metricId =
static_cast<short>(
ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1021 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(
ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1022 MetricSpec{.name =
"arrow-bytes-destroyed",
1023 .enabled = arrowAndResourceLimitingMetrics,
1025 .kind = Kind::UInt64,
1026 .scope = Scope::DPL,
1027 .minPublishInterval = 0,
1028 .maxRefreshLatency = 10000,
1029 .sendInitialValue =
true},
1030 MetricSpec{.name =
"arrow-messages-destroyed",
1031 .enabled = arrowAndResourceLimitingMetrics,
1033 .kind = Kind::UInt64,
1034 .scope = Scope::DPL,
1035 .minPublishInterval = 0,
1036 .maxRefreshLatency = 10000,
1037 .sendInitialValue =
true},
1038 MetricSpec{.name =
"arrow-bytes-created",
1039 .enabled = arrowAndResourceLimitingMetrics,
1041 .kind = Kind::UInt64,
1042 .scope = Scope::DPL,
1043 .minPublishInterval = 0,
1044 .maxRefreshLatency = 10000,
1045 .sendInitialValue =
true},
1046 MetricSpec{.name =
"arrow-messages-created",
1047 .enabled = arrowAndResourceLimitingMetrics,
1049 .kind = Kind::UInt64,
1050 .scope = Scope::DPL,
1051 .minPublishInterval = 0,
1052 .maxRefreshLatency = 10000,
1053 .sendInitialValue =
true},
1054 MetricSpec{.name =
"arrow-bytes-expired",
1055 .enabled = arrowAndResourceLimitingMetrics,
1057 .kind = Kind::UInt64,
1058 .scope = Scope::DPL,
1059 .minPublishInterval = 0,
1060 .maxRefreshLatency = 10000,
1061 .sendInitialValue =
true},
1062 MetricSpec{.name =
"shm-offer-bytes-consumed",
1063 .enabled = arrowAndResourceLimitingMetrics,
1065 .kind = Kind::UInt64,
1066 .scope = Scope::DPL,
1067 .minPublishInterval = 0,
1068 .maxRefreshLatency = 10000,
1069 .sendInitialValue =
true},
1070 MetricSpec{.name =
"resources-missing",
1071 .enabled = enableDebugMetrics,
1073 .kind = Kind::UInt64,
1074 .scope = Scope::DPL,
1075 .minPublishInterval = 1000,
1076 .maxRefreshLatency = 1000,
1077 .sendInitialValue =
true},
1078 MetricSpec{.name =
"resources-insufficient",
1079 .enabled = enableDebugMetrics,
1081 .kind = Kind::UInt64,
1082 .scope = Scope::DPL,
1083 .minPublishInterval = 1000,
1084 .maxRefreshLatency = 1000,
1085 .sendInitialValue =
true},
1086 MetricSpec{.name =
"resources-satisfactory",
1087 .enabled = enableDebugMetrics,
1089 .kind = Kind::UInt64,
1090 .scope = Scope::DPL,
1091 .minPublishInterval = 1000,
1092 .maxRefreshLatency = 1000,
1093 .sendInitialValue =
true},
1094 MetricSpec{.name =
"resource-offer-expired",
1095 .enabled = arrowAndResourceLimitingMetrics,
1097 .kind = Kind::UInt64,
1098 .scope = Scope::DPL,
1099 .minPublishInterval = 0,
1100 .maxRefreshLatency = 10000,
1101 .sendInitialValue =
true}};
1103 for (
auto& metric :
metrics) {
1105 if (spec.name.compare(
"readout-proxy") == 0) {
1106 stats->hasAvailSHMMetric =
true;
1111 stats->registerMetric(metric);
1114 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1122 sendRelayerMetrics(context.
services(), *stats);
1123 flushMetrics(context.
services(), *stats); },
1126 sendRelayerMetrics(context.
services(), *stats);
1127 flushMetrics(context.
services(), *stats); },
1130 sendRelayerMetrics(context.
services(), *stats);
1131 flushMetrics(context.
services(), *stats); },
1134 flushMetrics(
ref, *stats); },
1143 .
name =
"data-processing-states",
1146 clock_gettime(CLOCK_REALTIME, &now);
1147 uv_update_time(
state.loop);
1148 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
1157 states->processCommandQueue(); },
1176 .
name =
"gui-metrics",
1181 monitoring.send({(
int)spec.inputChannels.size(), fmt::format(
"oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1182 monitoring.send({(
int)1, fmt::format(
"oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1183 monitoring.send({(
int)spec.outputChannels.size(), fmt::format(
"oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1184 monitoring.send({(
int)1, fmt::format(
"oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1185 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1192 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1193 for (
size_t ci; ci < spec.outputChannels.size(); ++ci) {
1194 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format(
"oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1198 monitoring.send({(uint64_t)timeslice, fmt::format(
"oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1206 .
name =
"object-cache",
1209 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1218 .
name =
"data-processing-context",
1230 .
name =
"device-context",
1241 .
name =
"data-allocator",
1242 .uniqueId = simpleServiceId<DataAllocator>(),
1245 .
hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1248 .name =
"data-allocator",
1258 std::vector<ServiceSpec> specs{
1288 std::string loadableServicesStr = extraPlugins;
1291 if (loadableServicesStr.empty() ==
false) {
1292 loadableServicesStr +=
",";
1294 loadableServicesStr +=
"O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1297 std::vector<LoadablePlugin> loadablePlugins = {};
1298 char* loadableServicesEnv = getenv(
"DPL_LOAD_SERVICES");
1302 if (loadableServicesEnv) {
1303 if (loadableServicesStr.empty() ==
false) {
1304 loadableServicesStr +=
",";
1306 loadableServicesStr += loadableServicesEnv;
1309 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
auto decongestionCallback
@ NoTransition
No pending transitions.
@ AVAILABLE_MANAGED_SHM_BASE
@ SHM_OFFER_BYTES_CONSUMED
@ DROPPED_INCOMING_MESSAGES
@ ARROW_MESSAGES_DESTROYED
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
static ServiceSpec fairMQDeviceProxy()
static ServiceSpec fairMQBackendSpec()
static ServiceSpec stringBackendSpec()
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec objectCache()
static ServiceSpec controlSpec()
static ServiceSpec deviceContextSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
int64_t minOnlinePublishInterval
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
header::DataOrigin origin
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"