52#include "../src/DataProcessingStatus.h"
59#include <Configuration/ConfigurationInterface.h>
60#include <Configuration/ConfigurationFactory.h>
61#include <Monitoring/MonitoringFactory.h>
64#include <fairmq/Device.h>
65#include <fairmq/shmem/Monitor.h>
66#include <fairmq/shmem/Common.h>
67#include <fairmq/ProgOptions.h>
73using o2::configuration::ConfigurationFactory;
74using o2::configuration::ConfigurationInterface;
75using o2::monitoring::Monitoring;
76using o2::monitoring::MonitoringFactory;
77using Metric = o2::monitoring::Metric;
78using Key = o2::monitoring::tags::Key;
79using Value = o2::monitoring::tags::Value;
89#define MONITORING_QUEUE_SIZE 100
95 void* service =
nullptr;
96 bool isWebsocket = strncmp(options.GetPropertyAsString(
"driver-client-backend").c_str(),
"ws://", 4) == 0;
97 bool isDefault = options.GetPropertyAsString(
"monitoring-backend") ==
"default";
98 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString(
"monitoring-backend") ==
"dpl://";
99 o2::monitoring::Monitoring* monitoring;
102 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
104 monitoring->addBackend(std::move(dplBackend));
106 auto backend = isDefault ?
"infologger://" : options.GetPropertyAsString(
"monitoring-backend");
107 monitoring = MonitoringFactory::Get(backend).release();
109 service = monitoring;
113 monitoring->addGlobalTag(
"dataprocessor_name", registry.
get<
DeviceSpec const>().
name);
114 monitoring->addGlobalTag(
"dpl_instance", options.GetPropertyAsString(
"shm-segment-id"));
115 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
119 auto* monitoring = (o2::monitoring::Monitoring*)service;
121 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
122 if (extRunNumber ==
"unspecified") {
126 monitoring->setRunNumber(std::stoul(extRunNumber));
130 auto* monitoring =
reinterpret_cast<Monitoring*
>(service);
131 monitoring->flushBuffer();
132 delete monitoring; },
140 .
name =
"async-queue",
141 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
156 .
name =
"timing-info",
157 .uniqueId = simpleServiceId<TimingInfo>(),
158 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
166 .
name =
"stream-context",
167 .uniqueId = simpleServiceId<StreamContext>(),
168 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
177 stream->routeDPLCreated.resize(routes.size());
178 stream->routeCreated.resize(routes.size());
180 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false);
181 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false); },
190 bool userDidCreate =
false;
192 for (
size_t ri = 0; ri < routes.size(); ++ri) {
193 if (
stream->routeCreated[ri] ==
true &&
stream->routeDPLCreated[ri] ==
false) {
194 userDidCreate =
true;
198 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"userDidCreate == %d && didDispatch == %d",
200 messageContext.didDispatch());
201 if (userDidCreate ==
false && messageContext.didDispatch() ==
true) {
202 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
204 messageContext.didDispatch());
207 if (userDidCreate ==
false && messageContext.didDispatch() ==
false) {
212 for (
size_t ri = 0; ri < routes.size(); ++ri) {
213 auto &route = routes[ri];
214 auto &matcher = route.matcher;
215 if (
stream->routeDPLCreated[ri] ==
true) {
216 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created by DPL. ri = %" PRIu64
", %{public}s",
220 if (
stream->routeCreated[ri] ==
true) {
222 }
if ((timeslice % route.maxTimeslices) != route.timeslice) {
223 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Route ri = %" PRIu64
", skipped because of pipelining.",
227 if (matcher.lifetime == Lifetime::Timeframe) {
229 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64
" and might result in dropped timeframes",
231 LOGP(error,
"Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes",
DataSpecUtils::describe(matcher), timeslice);
245 stream->routeDPLCreated.resize(routes.size());
246 stream->routeCreated.resize(routes.size());
248 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false);
249 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false); },
256 .
name =
"datataking-contex",
257 .uniqueId = simpleServiceId<DataTakingContext>(),
258 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
262 for (
auto const&
ref : processingContext.
inputs()) {
264 const auto* dh = o2::header::get<o2::header::DataHeader*>(
ref.header);
268 context.runNumber = fmt::format(
"{}", dh->runNumber);
277 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
278 if (extRunNumber !=
"unspecified" || context.runNumber ==
"0") {
279 context.runNumber = extRunNumber;
281 auto extLHCPeriod = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"lhc_period",
"unspecified");
282 if (extLHCPeriod !=
"unspecified") {
283 context.lhcPeriod = extLHCPeriod;
285 static const char* months[12] = {
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC"};
286 time_t now =
time(
nullptr);
287 auto ltm = gmtime(&now);
288 context.lhcPeriod = months[ltm->tm_mon];
289 LOG(info) <<
"LHCPeriod is not available, using current month " << context.lhcPeriod;
292 auto extRunType = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"run_type",
"unspecified");
293 if (extRunType !=
"unspecified") {
294 context.runType = extRunType;
296 auto extEnvId = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"environment_id",
"unspecified");
297 if (extEnvId !=
"unspecified") {
298 context.envId = extEnvId;
300 auto extDetectors = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"detectors",
"unspecified");
301 if (extDetectors !=
"unspecified") {
302 context.detectors = extDetectors;
304 auto forcedRaw = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"force_run_as_raw",
"false");
305 context.forcedRaw = forcedRaw ==
"true"; },
315 .
name =
"configuration",
317 auto backend = options.GetPropertyAsString(
"configuration");
318 if (backend ==
"command-line") {
321 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
322 ConfigurationFactory::getConfiguration(backend).release()};
326 if (dc.options.count(
"configuration") == 0) {
330 auto backend = dc.options[
"configuration"].as<std::string>();
332 ConfigurationFactory::getConfiguration(backend).release()}); },
339 .
name =
"driverClient",
341 auto backend = options.GetPropertyAsString(
"driver-client-backend");
342 if (backend ==
"stdout://") {
343 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
347 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
359 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
369 .
name =
"localrootfile",
370 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
381 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
391 .
name =
"timesliceindex",
394 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
405 .init = simpleServiceInit<CallbackService, CallbackService>(),
413 .
name =
"datarelayer",
429 .
name =
"datasender",
478 .
name =
"ccdb-support",
482 for (
auto&
output : spec.outputs) {
484 LOGP(
debug,
"Optional inputs support enabled");
496 LOGP(
debug,
"We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
512 if (concrete.subSpec == 0) {
516 stfDist.
id = timingInfo.timeslice;
517 stfDist.firstOrbit = timingInfo.firstTForbit;
518 stfDist.runNumber = timingInfo.runNumber;
522 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"finaliseOutputs",
"Route %" PRIu64
" (%{public}s) was created by DPL.", (uint64_t)oi,
524 streamContext.routeDPLCreated[oi] =
true;
543 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
544 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Not sending already sent value: %" PRIu64
"> %" PRIu64,
545 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
548 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Running oldest possible timeslice %" PRIu64
" propagation.",
549 (uint64_t)oldestPossibleOutput.timeslice.value);
552 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
553 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
556 if (info.channelType != ChannelAccountingType::DPL) {
557 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
562 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
563 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
566 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
577 int64_t oldNextTimeslice = decongestion.nextTimeslice;
578 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
579 if (oldNextTimeslice != decongestion.nextTimeslice) {
581 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
583 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
585 timesliceIndex.rescan();
597 .
name =
"decongestion",
600 for (
auto& input : services.
get<
DeviceSpec const>().inputs) {
601 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
602 LOGP(detail,
"Found a real data input, we cannot update the oldest possible timeslice when sending messages");
603 decongestion->isFirstInTopology =
false;
608 decongestion->oldestPossibleTimesliceTask =
AsyncQueueHelpers::create(queue, {.name =
"oldest-possible-timeslice", .score = 100});
617 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"postForwardingCallbacks",
"We are the first one in the topology, we need to update the oldest possible timeslice");
620 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
623 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
624 LOGP(detail,
"Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
628 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
630 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
631 (uint64_t)oldestPossibleOutput.timeslice.value);
634 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
635 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
636 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
640 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Broadcasting oldest posssible output %" PRIu64
" due to %{public}s (%" PRIu64
")",
641 (uint64_t)oldestPossibleOutput.timeslice.value,
642 oldestPossibleOutput.slot.index == -1 ?
"channel" :
"slot",
643 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
644 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Ordered active %d", decongestion->orderedCompletionPolicyActive);
645 if (decongestion->orderedCompletionPolicyActive) {
646 auto oldNextTimeslice = decongestion->nextTimeslice;
647 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
648 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Next timeslice %" PRIi64, decongestion->nextTimeslice);
649 if (oldNextTimeslice != decongestion->nextTimeslice) {
652 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
654 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
661 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
662 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
666 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
671 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
672 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
675 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
679 decongestion->nextEnumerationTimeslice = 0;
680 decongestion->nextEnumerationTimesliceRewinded =
false;
681 decongestion->lastTimeslice = 0;
682 decongestion->nextTimeslice = 0;
683 decongestion->oldestPossibleTimesliceTask = {0};
685 for (
auto &channel :
state.inputChannelInfos) {
686 channel.oldestForChannel = {0};
693 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Received oldest possible timeframe %" PRIu64
" from channel %d",
694 (uint64_t)oldestPossibleTimeslice, channel.value);
695 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
696 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
697 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
699 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
700 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
703 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
704 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
705 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
712 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Queueing oldest possible timeslice %" PRIu64
" propagation for execution.",
713 (uint64_t)oldestPossibleOutput.timeslice.value);
716 .id = decongestion.oldestPossibleTimesliceTask,
720 if (decongestion.orderedCompletionPolicyActive) {
724 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
736 .
name =
"threadpool",
741 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
743 .configure = [](
InitContext&,
void* service) ->
void* {
744 auto* t =
reinterpret_cast<ThreadPool*
>(service);
752 setenv(
"UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
765 using namespace fair::mq::shmem;
766 auto& spec = registry.get<
DeviceSpec const>();
769 if (stats.hasAvailSHMMetric) {
771 long freeMemory = -1;
773 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
776 if (freeMemory == -1) {
778 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
785 auto device = registry.get<RawDeviceService>().device();
787 int64_t totalBytesIn = 0;
788 int64_t totalBytesOut = 0;
790 for (
auto& channel : device->GetChannels()) {
791 totalBytesIn += channel.second[0].GetBytesRx();
792 totalBytesOut += channel.second[0].GetBytesTx();
802auto flushStates(ServiceRegistryRef registry, DataProcessingStates&
states) ->
void
804 if (!registry.get<DriverConfig
const>().driverHasGUI) {
807 states.flushChangedStates([&
states, registry](std::string
const& spec, int64_t timestamp, std::string_view
value)
mutable ->
void {
808 auto& client = registry.get<ControlService>();
809 client.push(spec,
value, timestamp);
816auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) ->
void
822 if (registry.isMainThread() ==
false) {
823 LOGP(fatal,
"Flushing metrics should only happen on the main thread.");
825 auto& monitoring = registry.get<
Monitoring>();
826 auto& relayer = registry.get<DataRelayer>();
829 stats.flushChangedMetrics([&monitoring](DataProcessingStats::MetricSpec
const& spec, int64_t timestamp, int64_t
value)
mutable ->
void {
831 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
832 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
835 LOG(
debug) <<
"Value for " << spec.name <<
" is negative, setting to 0";
838 metric.addValue((uint64_t)
value,
"value");
840 if (
value > (int64_t)std::numeric_limits<int>::max()) {
841 LOG(warning) <<
"Value for " << spec.name <<
" is too large, setting to INT_MAX";
842 value = (int64_t)std::numeric_limits<int>::max();
844 if (
value < (int64_t)std::numeric_limits<int>::min()) {
845 value = (int64_t)std::numeric_limits<int>::min();
846 LOG(warning) <<
"Value for " << spec.name <<
" is too small, setting to INT_MIN";
848 metric.addValue((
int)
value,
"value");
851 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
853 monitoring.send(std::move(metric));
855 relayer.sendContextState();
856 monitoring.flushBuffer();
857 O2_SIGNPOST_END(monitoring_service, sid,
"flush",
"done flushing metrics");
864 .
name =
"data-processing-stats",
867 clock_gettime(CLOCK_REALTIME, &now);
868 uv_update_time(
state.loop);
869 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
871 .
minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>(
"dpl-stats-min-online-publishing-interval").c_str()) * 1000};
878 int quickUpdateInterval = 5000;
879 uint64_t quickRefreshInterval = 7000;
880 uint64_t onlineRefreshLatency = 60000;
886 bool enableDebugMetrics =
false;
888 bool enableDebugMetrics =
true;
890 bool arrowAndResourceLimitingMetrics =
false;
892 arrowAndResourceLimitingMetrics =
true;
895 int64_t consumedTimeframesPublishInterval = 0;
897 consumedTimeframesPublishInterval = 5000;
902 bool enableCPUUsageFraction =
true;
904 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
906 O2_SIGNPOST_EVENT_EMIT(policies, mid,
"metrics",
"Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
907 enableCPUUsageFraction =
false;
910 std::vector<DataProcessingStats::MetricSpec>
metrics = {
911 MetricSpec{.name =
"errors",
913 .kind = Kind::UInt64,
914 .scope = Scope::Online,
915 .minPublishInterval = quickUpdateInterval,
916 .maxRefreshLatency = quickRefreshInterval},
917 MetricSpec{.name =
"exceptions",
919 .kind = Kind::UInt64,
920 .scope = Scope::Online,
921 .minPublishInterval = quickUpdateInterval},
922 MetricSpec{.name =
"inputs/relayed/pending",
924 .kind = Kind::UInt64,
925 .minPublishInterval = quickUpdateInterval},
926 MetricSpec{.name =
"inputs/relayed/incomplete",
928 .kind = Kind::UInt64,
929 .minPublishInterval = quickUpdateInterval},
930 MetricSpec{.name =
"inputs/relayed/total",
932 .kind = Kind::UInt64,
933 .minPublishInterval = quickUpdateInterval},
934 MetricSpec{.name =
"elapsed_time_ms",
936 .kind = Kind::UInt64,
937 .minPublishInterval = quickUpdateInterval},
938 MetricSpec{.name =
"total_wall_time_ms",
940 .kind = Kind::UInt64,
941 .minPublishInterval = quickUpdateInterval},
942 MetricSpec{.name =
"last_processed_input_size_byte",
944 .kind = Kind::UInt64,
945 .minPublishInterval = quickUpdateInterval},
946 MetricSpec{.name =
"total_processed_input_size_byte",
948 .kind = Kind::UInt64,
949 .scope = Scope::Online,
950 .minPublishInterval = quickUpdateInterval},
951 MetricSpec{.name =
"total_sigusr1",
953 .kind = Kind::UInt64,
954 .minPublishInterval = quickUpdateInterval},
955 MetricSpec{.name =
"consumed-timeframes",
957 .kind = Kind::UInt64,
958 .minPublishInterval = consumedTimeframesPublishInterval,
959 .maxRefreshLatency = quickRefreshInterval,
960 .sendInitialValue =
true},
961 MetricSpec{.name =
"min_input_latency_ms",
963 .kind = Kind::UInt64,
964 .scope = Scope::Online,
965 .minPublishInterval = quickUpdateInterval},
966 MetricSpec{.name =
"max_input_latency_ms",
968 .kind = Kind::UInt64,
969 .minPublishInterval = quickUpdateInterval},
970 MetricSpec{.name =
"total_rate_in_mb_s",
973 .scope = Scope::Online,
974 .minPublishInterval = quickUpdateInterval,
975 .maxRefreshLatency = onlineRefreshLatency,
976 .sendInitialValue =
true},
977 MetricSpec{.name =
"total_rate_out_mb_s",
980 .scope = Scope::Online,
981 .minPublishInterval = quickUpdateInterval,
982 .maxRefreshLatency = onlineRefreshLatency,
983 .sendInitialValue =
true},
984 MetricSpec{.name =
"processing_rate_hz",
987 .scope = Scope::Online,
988 .minPublishInterval = quickUpdateInterval,
989 .maxRefreshLatency = onlineRefreshLatency,
990 .sendInitialValue =
true},
991 MetricSpec{.name =
"cpu_usage_fraction",
992 .enabled = enableCPUUsageFraction,
995 .scope = Scope::Online,
996 .minPublishInterval = quickUpdateInterval,
997 .maxRefreshLatency = onlineRefreshLatency,
998 .sendInitialValue =
true},
999 MetricSpec{.name =
"performed_computations",
1001 .kind = Kind::UInt64,
1002 .scope = Scope::Online,
1003 .minPublishInterval = quickUpdateInterval,
1004 .maxRefreshLatency = onlineRefreshLatency,
1005 .sendInitialValue =
true},
1006 MetricSpec{.name =
"total_bytes_in",
1008 .kind = Kind::UInt64,
1009 .scope = Scope::Online,
1010 .minPublishInterval = quickUpdateInterval,
1011 .maxRefreshLatency = onlineRefreshLatency,
1012 .sendInitialValue =
true},
1013 MetricSpec{.name =
"total_bytes_out",
1015 .kind = Kind::UInt64,
1016 .scope = Scope::Online,
1017 .minPublishInterval = quickUpdateInterval,
1018 .maxRefreshLatency = onlineRefreshLatency,
1019 .sendInitialValue =
true},
1020 MetricSpec{.name = fmt::format(
"available_managed_shm_{}", runningWorkflow.shmSegmentId),
1022 .kind = Kind::UInt64,
1023 .scope = Scope::Online,
1024 .minPublishInterval = 500,
1025 .maxRefreshLatency = onlineRefreshLatency,
1026 .sendInitialValue =
true},
1027 MetricSpec{.name =
"malformed_inputs", .metricId =
static_cast<short>(
ProcessingStatsId::MALFORMED_INPUTS), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1030 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(
ProcessingStatsId::RELAYED_MESSAGES), .kind = Kind::UInt64, .minPublishInterval = quickUpdateInterval},
1031 MetricSpec{.name =
"arrow-bytes-destroyed",
1032 .enabled = arrowAndResourceLimitingMetrics,
1034 .kind = Kind::UInt64,
1035 .scope = Scope::DPL,
1036 .minPublishInterval = 0,
1037 .maxRefreshLatency = 10000,
1038 .sendInitialValue =
true},
1039 MetricSpec{.name =
"arrow-messages-destroyed",
1040 .enabled = arrowAndResourceLimitingMetrics,
1042 .kind = Kind::UInt64,
1043 .scope = Scope::DPL,
1044 .minPublishInterval = 0,
1045 .maxRefreshLatency = 10000,
1046 .sendInitialValue =
true},
1047 MetricSpec{.name =
"arrow-bytes-created",
1048 .enabled = arrowAndResourceLimitingMetrics,
1050 .kind = Kind::UInt64,
1051 .scope = Scope::DPL,
1052 .minPublishInterval = 0,
1053 .maxRefreshLatency = 10000,
1054 .sendInitialValue =
true},
1055 MetricSpec{.name =
"arrow-messages-created",
1056 .enabled = arrowAndResourceLimitingMetrics,
1058 .kind = Kind::UInt64,
1059 .scope = Scope::DPL,
1060 .minPublishInterval = 0,
1061 .maxRefreshLatency = 10000,
1062 .sendInitialValue =
true},
1063 MetricSpec{.name =
"arrow-bytes-expired",
1064 .enabled = arrowAndResourceLimitingMetrics,
1066 .kind = Kind::UInt64,
1067 .scope = Scope::DPL,
1068 .minPublishInterval = 0,
1069 .maxRefreshLatency = 10000,
1070 .sendInitialValue =
true},
1071 MetricSpec{.name =
"shm-offer-bytes-consumed",
1072 .enabled = arrowAndResourceLimitingMetrics,
1074 .kind = Kind::UInt64,
1075 .scope = Scope::DPL,
1076 .minPublishInterval = 0,
1077 .maxRefreshLatency = 10000,
1078 .sendInitialValue =
true},
1079 MetricSpec{.name =
"resources-missing",
1080 .enabled = enableDebugMetrics,
1082 .kind = Kind::UInt64,
1083 .scope = Scope::DPL,
1084 .minPublishInterval = 1000,
1085 .maxRefreshLatency = 1000,
1086 .sendInitialValue =
true},
1087 MetricSpec{.name =
"resources-insufficient",
1088 .enabled = enableDebugMetrics,
1090 .kind = Kind::UInt64,
1091 .scope = Scope::DPL,
1092 .minPublishInterval = 1000,
1093 .maxRefreshLatency = 1000,
1094 .sendInitialValue =
true},
1095 MetricSpec{.name =
"resources-satisfactory",
1096 .enabled = enableDebugMetrics,
1098 .kind = Kind::UInt64,
1099 .scope = Scope::DPL,
1100 .minPublishInterval = 1000,
1101 .maxRefreshLatency = 1000,
1102 .sendInitialValue =
true},
1103 MetricSpec{.name =
"resource-offer-expired",
1104 .enabled = arrowAndResourceLimitingMetrics,
1106 .kind = Kind::UInt64,
1107 .scope = Scope::DPL,
1108 .minPublishInterval = 0,
1109 .maxRefreshLatency = 10000,
1110 .sendInitialValue =
true}};
1112 for (
auto& metric :
metrics) {
1114 if (spec.name.compare(
"readout-proxy") == 0) {
1115 stats->hasAvailSHMMetric =
true;
1120 stats->registerMetric(metric);
1123 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1131 sendRelayerMetrics(context.
services(), *stats);
1132 flushMetrics(context.
services(), *stats); },
1135 sendRelayerMetrics(context.
services(), *stats);
1136 flushMetrics(context.
services(), *stats); },
1139 sendRelayerMetrics(context.
services(), *stats);
1140 flushMetrics(context.
services(), *stats); },
1143 flushMetrics(
ref, *stats); },
1152 .
name =
"data-processing-states",
1155 clock_gettime(CLOCK_REALTIME, &now);
1156 uv_update_time(
state.loop);
1157 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
1166 states->processCommandQueue(); },
1185 .
name =
"gui-metrics",
1190 monitoring.send({(
int)spec.inputChannels.size(), fmt::format(
"oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1191 monitoring.send({(
int)1, fmt::format(
"oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1192 monitoring.send({(
int)spec.outputChannels.size(), fmt::format(
"oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1193 monitoring.send({(
int)1, fmt::format(
"oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1194 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1201 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1202 for (
size_t ci; ci < spec.outputChannels.size(); ++ci) {
1203 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format(
"oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1207 monitoring.send({(uint64_t)timeslice, fmt::format(
"oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1215 .
name =
"object-cache",
1218 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1227 .
name =
"data-processing-context",
1239 .
name =
"device-context",
1250 .
name =
"data-allocator",
1251 .uniqueId = simpleServiceId<DataAllocator>(),
1254 .
hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1257 .name =
"data-allocator",
1267 std::vector<ServiceSpec> specs{
1297 std::string loadableServicesStr = extraPlugins;
1300 if (loadableServicesStr.empty() ==
false) {
1301 loadableServicesStr +=
",";
1303 loadableServicesStr +=
"O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1306 std::vector<LoadablePlugin> loadablePlugins = {};
1307 char* loadableServicesEnv = getenv(
"DPL_LOAD_SERVICES");
1311 if (loadableServicesEnv) {
1312 if (loadableServicesStr.empty() ==
false) {
1313 loadableServicesStr +=
",";
1315 loadableServicesStr += loadableServicesEnv;
1318 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
auto decongestionCallback
@ NoTransition
No pending transitions.
@ AVAILABLE_MANAGED_SHM_BASE
@ SHM_OFFER_BYTES_CONSUMED
@ DROPPED_INCOMING_MESSAGES
@ ARROW_MESSAGES_DESTROYED
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
static ServiceSpec fairMQDeviceProxy()
static ServiceSpec fairMQBackendSpec()
static ServiceSpec stringBackendSpec()
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec objectCache()
static ServiceSpec controlSpec()
static ServiceSpec deviceContextSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
int64_t minOnlinePublishInterval
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
header::DataOrigin origin
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"