53#include "../src/DataProcessingStatus.h"
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/MonitoringFactory.h>
65#include <fairmq/Device.h>
66#include <fairmq/shmem/Monitor.h>
67#include <fairmq/shmem/Common.h>
68#include <fairmq/ProgOptions.h>
74using o2::configuration::ConfigurationFactory;
75using o2::configuration::ConfigurationInterface;
76using o2::monitoring::Monitoring;
77using o2::monitoring::MonitoringFactory;
78using Metric = o2::monitoring::Metric;
79using Key = o2::monitoring::tags::Key;
80using Value = o2::monitoring::tags::Value;
90#define MONITORING_QUEUE_SIZE 100
96 void* service =
nullptr;
97 bool isWebsocket = strncmp(options.GetPropertyAsString(
"driver-client-backend").c_str(),
"ws://", 4) == 0;
98 bool isDefault = options.GetPropertyAsString(
"monitoring-backend") ==
"default";
99 bool useDPL = (isWebsocket && isDefault) || options.GetPropertyAsString(
"monitoring-backend") ==
"dpl://";
100 o2::monitoring::Monitoring* monitoring;
103 auto dplBackend = std::make_unique<DPLMonitoringBackend>(registry);
105 monitoring->addBackend(std::move(dplBackend));
107 auto backend = isDefault ?
"infologger://" : options.GetPropertyAsString(
"monitoring-backend");
108 monitoring = MonitoringFactory::Get(backend).release();
110 service = monitoring;
114 monitoring->addGlobalTag(
"dataprocessor_name", registry.
get<
DeviceSpec const>().
name);
115 monitoring->addGlobalTag(
"dpl_instance", options.GetPropertyAsString(
"shm-segment-id"));
116 return ServiceHandle{TypeIdHelpers::uniqueId<Monitoring>(), service};
120 auto* monitoring = (o2::monitoring::Monitoring*)service;
122 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
123 if (extRunNumber ==
"unspecified") {
127 monitoring->setRunNumber(std::stoul(extRunNumber));
131 auto* monitoring =
reinterpret_cast<Monitoring*
>(service);
132 monitoring->flushBuffer();
133 delete monitoring; },
141 .
name =
"async-queue",
142 .init = simpleServiceInit<AsyncQueue, AsyncQueue>(),
157 .
name =
"timing-info",
158 .uniqueId = simpleServiceId<TimingInfo>(),
159 .init = simpleServiceInit<TimingInfo, TimingInfo, ServiceKind::Stream>(),
167 .
name =
"stream-context",
168 .uniqueId = simpleServiceId<StreamContext>(),
169 .init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
181 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false);
182 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false); },
191 bool userDidCreate =
false;
193 for (
size_t ri = 0; ri <
routes.size(); ++ri) {
194 if (
stream->routeCreated[ri] ==
true &&
stream->routeDPLCreated[ri] ==
false) {
195 userDidCreate =
true;
199 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"userDidCreate == %d && didDispatch == %d",
201 messageContext.didDispatch());
202 if (userDidCreate ==
false && messageContext.didDispatch() ==
true) {
203 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created out of band userDidCreate == %d && messageContext.didDispatch == %d",
205 messageContext.didDispatch());
208 if (userDidCreate ==
false && messageContext.didDispatch() ==
false) {
213 for (
size_t ri = 0; ri <
routes.size(); ++ri) {
215 auto &matcher = route.matcher;
216 if (
stream->routeDPLCreated[ri] ==
true) {
217 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Data created by DPL. ri = %" PRIu64
", %{public}s",
221 if (
stream->routeCreated[ri] ==
true) {
223 }
if ((timeslice % route.maxTimeslices) != route.timeslice) {
224 O2_SIGNPOST_EVENT_EMIT(stream_context, cid,
"postProcessingCallbacks",
"Route ri = %" PRIu64
", skipped because of pipelining.",
228 if (matcher.lifetime == Lifetime::Timeframe) {
230 "Expected Lifetime::Timeframe data %{public}s was not created for timeslice %" PRIu64
" and might result in dropped timeframes",
232 LOGP(error,
"Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes",
DataSpecUtils::describe(matcher), timeslice);
249 std::fill(
stream->routeCreated.begin(),
stream->routeCreated.end(),
false);
250 std::fill(
stream->routeDPLCreated.begin(),
stream->routeDPLCreated.end(),
false); },
257 .
name =
"datataking-contex",
258 .uniqueId = simpleServiceId<DataTakingContext>(),
259 .init = simpleServiceInit<DataTakingContext, DataTakingContext, ServiceKind::Stream>(),
263 for (
auto const&
ref : processingContext.
inputs()) {
265 const auto* dh = o2::header::get<o2::header::DataHeader*>(
ref.header);
269 context.runNumber = fmt::format(
"{}", dh->runNumber);
278 auto extRunNumber = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"runNumber",
"unspecified");
279 if (extRunNumber !=
"unspecified" || context.runNumber ==
"0") {
280 context.runNumber = extRunNumber;
282 auto extLHCPeriod = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"lhc_period",
"unspecified");
283 if (extLHCPeriod !=
"unspecified") {
284 context.lhcPeriod = extLHCPeriod;
286 static const char* months[12] = {
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC"};
287 time_t now =
time(
nullptr);
288 auto ltm = gmtime(&now);
289 context.lhcPeriod = months[ltm->tm_mon];
290 LOG(info) <<
"LHCPeriod is not available, using current month " << context.lhcPeriod;
293 auto extRunType = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"run_type",
"unspecified");
294 if (extRunType !=
"unspecified") {
295 context.runType = extRunType;
297 auto extEnvId = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"environment_id",
"unspecified");
298 if (extEnvId !=
"unspecified") {
299 context.envId = extEnvId;
301 auto extDetectors = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"detectors",
"unspecified");
302 if (extDetectors !=
"unspecified") {
303 context.detectors = extDetectors;
305 auto forcedRaw = services.
get<
RawDeviceService>().device()->fConfig->GetProperty<std::string>(
"force_run_as_raw",
"false");
306 context.forcedRaw = forcedRaw ==
"true"; },
316 .
name =
"configuration",
318 auto backend = options.GetPropertyAsString(
"configuration");
319 if (backend ==
"command-line") {
322 return ServiceHandle{TypeIdHelpers::uniqueId<ConfigurationInterface>(),
323 ConfigurationFactory::getConfiguration(backend).release()};
327 if (dc.options.count(
"configuration") == 0) {
331 auto backend = dc.options[
"configuration"].as<std::string>();
333 ConfigurationFactory::getConfiguration(backend).release()}); },
340 .
name =
"driverClient",
342 auto backend = options.GetPropertyAsString(
"driver-client-backend");
343 if (backend ==
"stdout://") {
344 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
348 return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
360 return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
370 .
name =
"localrootfile",
371 .init = simpleServiceInit<LocalRootFileService, LocalRootFileService>(),
382 return ServiceHandle{TypeIdHelpers::uniqueId<ParallelContext>(),
392 .
name =
"timesliceindex",
395 return ServiceHandle{TypeIdHelpers::uniqueId<TimesliceIndex>(),
406 .init = simpleServiceInit<CallbackService, CallbackService>(),
414 .
name =
"datarelayer",
430 .
name =
"datasender",
479 .
name =
"ccdb-support",
483 for (
auto&
output : spec.outputs) {
485 LOGP(
debug,
"Optional inputs support enabled");
497 LOGP(
debug,
"We are w/o outputs, do not automatically add DISTSUBTIMEFRAME to outgoing messages");
513 if (concrete.subSpec == 0) {
517 stfDist.
id = timingInfo.timeslice;
518 stfDist.firstOrbit = timingInfo.firstTForbit;
519 stfDist.runNumber = timingInfo.runNumber;
523 O2_SIGNPOST_EVENT_EMIT(stream_context, sid,
"finaliseOutputs",
"Route %" PRIu64
" (%{public}s) was created by DPL.", (uint64_t)oi,
525 streamContext.routeDPLCreated[oi] =
true;
544 if (decongestion.lastTimeslice >= oldestPossibleOutput.timeslice.value) {
545 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Not sending already sent value: %" PRIu64
"> %" PRIu64,
546 decongestion.lastTimeslice, (uint64_t)oldestPossibleOutput.timeslice.value);
549 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Running oldest possible timeslice %" PRIu64
" propagation.",
550 (uint64_t)oldestPossibleOutput.timeslice.value);
553 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
554 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
557 if (info.channelType != ChannelAccountingType::DPL) {
558 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
563 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
564 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
567 decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
578 int64_t oldNextTimeslice = decongestion.nextTimeslice;
579 decongestion.nextTimeslice = std::max(decongestion.nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
580 if (oldNextTimeslice != decongestion.nextTimeslice) {
582 O2_SIGNPOST_EVENT_EMIT_WARN(async_queue, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
584 O2_SIGNPOST_EVENT_EMIT_CRITICAL(async_queue, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
586 timesliceIndex.rescan();
598 .
name =
"decongestion",
601 for (
auto& input : services.
get<
DeviceSpec const>().inputs) {
602 if (input.matcher.lifetime == Lifetime::Timeframe || input.matcher.lifetime == Lifetime::QA || input.matcher.lifetime == Lifetime::Sporadic || input.matcher.lifetime == Lifetime::Optional) {
603 LOGP(detail,
"Found a real data input, we cannot update the oldest possible timeslice when sending messages");
604 decongestion->isFirstInTopology =
false;
610 decongestion->suppressDomainInfo =
true;
615 decongestion->oldestPossibleTimesliceTask =
AsyncQueueHelpers::create(queue, {.name =
"oldest-possible-timeslice", .score = 100});
624 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"postForwardingCallbacks",
"We are the first one in the topology, we need to update the oldest possible timeslice");
627 timesliceIndex.updateOldestPossibleOutput(decongestion->nextEnumerationTimesliceRewinded);
630 if (decongestion->nextEnumerationTimesliceRewinded && decongestion->nextEnumerationTimeslice < oldestPossibleOutput.timeslice.value) {
631 LOGP(detail,
"Not sending oldestPossible if nextEnumerationTimeslice was rewinded");
635 if (decongestion->lastTimeslice && oldestPossibleOutput.timeslice.value == decongestion->lastTimeslice) {
637 "Not sending already sent value for oldest possible timeslice: %" PRIu64,
638 (uint64_t)oldestPossibleOutput.timeslice.value);
641 if (oldestPossibleOutput.timeslice.value < decongestion->lastTimeslice) {
642 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we already sent {}",
643 oldestPossibleOutput.timeslice.value, decongestion->lastTimeslice);
647 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Broadcasting oldest posssible output %" PRIu64
" due to %{public}s (%" PRIu64
")",
648 (uint64_t)oldestPossibleOutput.timeslice.value,
649 oldestPossibleOutput.slot.index == -1 ?
"channel" :
"slot",
650 (uint64_t)(oldestPossibleOutput.slot.index == -1 ? oldestPossibleOutput.channel.value : oldestPossibleOutput.slot.index));
651 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Ordered active %d", decongestion->orderedCompletionPolicyActive);
652 if (decongestion->orderedCompletionPolicyActive) {
653 auto oldNextTimeslice = decongestion->nextTimeslice;
654 decongestion->nextTimeslice = std::max(decongestion->nextTimeslice, (int64_t)oldestPossibleOutput.timeslice.value);
655 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Next timeslice %" PRIi64, decongestion->nextTimeslice);
656 if (oldNextTimeslice != decongestion->nextTimeslice) {
659 O2_SIGNPOST_EVENT_EMIT_WARN(data_processor_context, cid,
"oldest_possible_timeslice",
"Stop transition requested. Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
661 O2_SIGNPOST_EVENT_EMIT_CRITICAL(data_processor_context, cid,
"oldest_possible_timeslice",
"Some Lifetime::Timeframe data got dropped starting at %" PRIi64, oldNextTimeslice);
668 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
669 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
673 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Skipping channel %{public}s", info.name.c_str());
678 "Forwarding to channel %{public}s oldest possible timeslice %" PRIu64
", priority %d",
679 info.name.c_str(), (uint64_t)oldestPossibleOutput.timeslice.value, 20);
682 decongestion->lastTimeslice = oldestPossibleOutput.timeslice.value; },
686 decongestion->nextEnumerationTimeslice = 0;
687 decongestion->nextEnumerationTimesliceRewinded =
false;
688 decongestion->lastTimeslice = 0;
689 decongestion->nextTimeslice = 0;
690 decongestion->oldestPossibleTimesliceTask = {0};
692 for (
auto &channel :
state.inputChannelInfos) {
693 channel.oldestForChannel = {0};
700 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Received oldest possible timeframe %" PRIu64
" from channel %d",
701 (uint64_t)oldestPossibleTimeslice, channel.value);
702 relayer.setOldestPossibleInput({oldestPossibleTimeslice}, channel);
703 timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
704 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
706 if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
707 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
710 if (oldestPossibleOutput.timeslice.value < decongestion.lastTimeslice) {
711 LOGP(error,
"We are trying to send an oldest possible timeslice {} that is older than the last one we sent {}",
712 oldestPossibleOutput.timeslice.value, decongestion.lastTimeslice);
719 O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid,
"oldest_possible_timeslice",
"Queueing oldest possible timeslice %" PRIu64
" propagation for execution.",
720 (uint64_t)oldestPossibleOutput.timeslice.value);
723 .id = decongestion.oldestPossibleTimesliceTask,
727 if (decongestion.orderedCompletionPolicyActive) {
731 .user<
DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
743 .
name =
"threadpool",
748 return ServiceHandle{TypeIdHelpers::uniqueId<ThreadPool>(), pool};
750 .configure = [](
InitContext&,
void* service) ->
void* {
751 auto* t =
reinterpret_cast<ThreadPool*
>(service);
759 setenv(
"UV_THREADPOOL_SIZE", numWorkersS.c_str(), 0);
772 using namespace fair::mq::shmem;
773 auto& spec = registry.get<
DeviceSpec const>();
776 if (stats.hasAvailSHMMetric) {
778 long freeMemory = -1;
780 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(ShmId{makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
783 if (freeMemory == -1) {
785 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
792 auto device = registry.get<RawDeviceService>().device();
794 int64_t totalBytesIn = 0;
795 int64_t totalBytesOut = 0;
797 for (
auto& channel : device->GetChannels()) {
798 totalBytesIn += channel.second[0].GetBytesRx();
799 totalBytesOut += channel.second[0].GetBytesTx();
809auto flushStates(ServiceRegistryRef registry, DataProcessingStates&
states) ->
void
811 if (!registry.get<DriverConfig
const>().driverHasGUI) {
814 states.flushChangedStates([&
states, registry](std::string
const& spec, int64_t timestamp, std::string_view
value)
mutable ->
void {
815 auto& client = registry.get<ControlService>();
816 client.push(spec,
value, timestamp);
823auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) ->
void
829 if (registry.isMainThread() ==
false) {
830 LOGP(fatal,
"Flushing metrics should only happen on the main thread.");
832 auto& monitoring = registry.get<
Monitoring>();
833 auto& relayer = registry.get<DataRelayer>();
836 stats.flushChangedMetrics([&monitoring, sid](DataProcessingStats::MetricSpec
const& spec, int64_t timestamp, int64_t
value)
mutable ->
void {
838 auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(std::chrono::milliseconds(timestamp));
839 auto metric = o2::monitoring::Metric{spec.name, Metric::DefaultVerbosity, tp};
842 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is negative, setting to 0",
846 metric.addValue((uint64_t)
value,
"value");
848 if (
value > (int64_t)std::numeric_limits<int>::max()) {
849 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is too large, setting to INT_MAX",
851 value = (int64_t)std::numeric_limits<int>::max();
853 if (
value < (int64_t)std::numeric_limits<int>::min()) {
854 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Value for %{public}s is too small, setting to INT_MIN",
856 value = (int64_t)std::numeric_limits<int>::min();
858 metric.addValue((
int)
value,
"value");
861 metric.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL);
863 O2_SIGNPOST_EVENT_EMIT(monitoring_service, sid,
"flushChangedMetrics",
"Flushing metric %{public}s", spec.name.c_str());
864 monitoring.send(std::move(metric));
866 relayer.sendContextState();
867 monitoring.flushBuffer();
868 O2_SIGNPOST_END(monitoring_service, sid,
"flush",
"done flushing metrics");
875 .
name =
"data-processing-stats",
878 clock_gettime(CLOCK_REALTIME, &now);
879 uv_update_time(
state.loop);
880 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
882 .
minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>(
"dpl-stats-min-online-publishing-interval").c_str()) * 1000};
889 int quickUpdateInterval = 5000;
890 uint64_t quickRefreshInterval = 7000;
891 uint64_t onlineRefreshLatency = 60000;
897 bool enableDebugMetrics =
false;
899 bool enableDebugMetrics =
true;
901 bool arrowAndResourceLimitingMetrics =
false;
903 arrowAndResourceLimitingMetrics =
true;
906 int64_t consumedTimeframesPublishInterval = 0;
908 consumedTimeframesPublishInterval = 5000;
913 bool enableCPUUsageFraction =
true;
915 if (std::find_if(spec.labels.begin(), spec.labels.end(), isProxy) != spec.labels.end()) {
917 O2_SIGNPOST_EVENT_EMIT(policies, mid,
"metrics",
"Disabling cpu_usage_fraction metric for proxy %{public}s", spec.name.c_str());
918 enableCPUUsageFraction =
false;
921 std::vector<DataProcessingStats::MetricSpec>
metrics = {
922 MetricSpec{.name =
"errors",
924 .
kind = Kind::UInt64,
925 .scope = Scope::Online,
926 .minPublishInterval = quickUpdateInterval,
927 .maxRefreshLatency = quickRefreshInterval},
928 MetricSpec{.name =
"exceptions",
930 .
kind = Kind::UInt64,
931 .scope = Scope::Online,
932 .minPublishInterval = quickUpdateInterval},
933 MetricSpec{.name =
"inputs/relayed/pending",
935 .
kind = Kind::UInt64,
936 .minPublishInterval = quickUpdateInterval},
937 MetricSpec{.name =
"inputs/relayed/incomplete",
939 .
kind = Kind::UInt64,
940 .minPublishInterval = quickUpdateInterval},
941 MetricSpec{.name =
"inputs/relayed/total",
943 .
kind = Kind::UInt64,
944 .minPublishInterval = quickUpdateInterval},
945 MetricSpec{.name =
"elapsed_time_ms",
947 .
kind = Kind::UInt64,
948 .minPublishInterval = quickUpdateInterval},
949 MetricSpec{.name =
"total_wall_time_ms",
951 .
kind = Kind::UInt64,
952 .minPublishInterval = quickUpdateInterval},
953 MetricSpec{.name =
"last_processed_input_size_byte",
955 .
kind = Kind::UInt64,
956 .minPublishInterval = quickUpdateInterval},
957 MetricSpec{.name =
"total_processed_input_size_byte",
959 .
kind = Kind::UInt64,
960 .scope = Scope::Online,
961 .minPublishInterval = quickUpdateInterval},
962 MetricSpec{.name =
"total_sigusr1",
964 .
kind = Kind::UInt64,
965 .minPublishInterval = quickUpdateInterval},
966 MetricSpec{.name =
"consumed-timeframes",
968 .
kind = Kind::UInt64,
969 .minPublishInterval = consumedTimeframesPublishInterval,
970 .maxRefreshLatency = quickRefreshInterval,
971 .sendInitialValue =
true},
972 MetricSpec{.name =
"min_input_latency_ms",
974 .
kind = Kind::UInt64,
975 .scope = Scope::Online,
976 .minPublishInterval = quickUpdateInterval},
977 MetricSpec{.name =
"max_input_latency_ms",
979 .
kind = Kind::UInt64,
980 .minPublishInterval = quickUpdateInterval},
981 MetricSpec{.name =
"total_rate_in_mb_s",
984 .scope = Scope::Online,
985 .minPublishInterval = quickUpdateInterval,
986 .maxRefreshLatency = onlineRefreshLatency,
987 .sendInitialValue =
true},
988 MetricSpec{.name =
"total_rate_out_mb_s",
991 .scope = Scope::Online,
992 .minPublishInterval = quickUpdateInterval,
993 .maxRefreshLatency = onlineRefreshLatency,
994 .sendInitialValue =
true},
995 MetricSpec{.name =
"processing_rate_hz",
998 .scope = Scope::Online,
999 .minPublishInterval = quickUpdateInterval,
1000 .maxRefreshLatency = onlineRefreshLatency,
1001 .sendInitialValue =
true},
1002 MetricSpec{.name =
"cpu_usage_fraction",
1003 .enabled = enableCPUUsageFraction,
1006 .scope = Scope::Online,
1007 .minPublishInterval = quickUpdateInterval,
1008 .maxRefreshLatency = onlineRefreshLatency,
1009 .sendInitialValue =
true},
1010 MetricSpec{.name =
"performed_computations",
1012 .
kind = Kind::UInt64,
1013 .scope = Scope::Online,
1014 .minPublishInterval = quickUpdateInterval,
1015 .maxRefreshLatency = onlineRefreshLatency,
1016 .sendInitialValue =
true},
1017 MetricSpec{.name =
"total_bytes_in",
1019 .
kind = Kind::UInt64,
1020 .scope = Scope::Online,
1021 .minPublishInterval = quickUpdateInterval,
1022 .maxRefreshLatency = onlineRefreshLatency,
1023 .sendInitialValue =
true},
1024 MetricSpec{.name =
"total_bytes_out",
1026 .
kind = Kind::UInt64,
1027 .scope = Scope::Online,
1028 .minPublishInterval = quickUpdateInterval,
1029 .maxRefreshLatency = onlineRefreshLatency,
1030 .sendInitialValue =
true},
1031 MetricSpec{.name = fmt::format(
"available_managed_shm_{}", runningWorkflow.shmSegmentId),
1033 .kind = Kind::UInt64,
1034 .scope = Scope::Online,
1035 .minPublishInterval = 500,
1036 .maxRefreshLatency = onlineRefreshLatency,
1037 .sendInitialValue =
true},
1042 MetricSpec{.name =
"arrow-bytes-destroyed",
1043 .enabled = arrowAndResourceLimitingMetrics,
1045 .
kind = Kind::UInt64,
1046 .scope = Scope::DPL,
1047 .minPublishInterval = 0,
1048 .maxRefreshLatency = 10000,
1049 .sendInitialValue =
true},
1050 MetricSpec{.name =
"arrow-messages-destroyed",
1051 .enabled = arrowAndResourceLimitingMetrics,
1053 .
kind = Kind::UInt64,
1054 .scope = Scope::DPL,
1055 .minPublishInterval = 0,
1056 .maxRefreshLatency = 10000,
1057 .sendInitialValue =
true},
1058 MetricSpec{.name =
"arrow-bytes-created",
1059 .enabled = arrowAndResourceLimitingMetrics,
1061 .
kind = Kind::UInt64,
1062 .scope = Scope::DPL,
1063 .minPublishInterval = 0,
1064 .maxRefreshLatency = 10000,
1065 .sendInitialValue =
true},
1066 MetricSpec{.name =
"arrow-messages-created",
1067 .enabled = arrowAndResourceLimitingMetrics,
1069 .
kind = Kind::UInt64,
1070 .scope = Scope::DPL,
1071 .minPublishInterval = 0,
1072 .maxRefreshLatency = 10000,
1073 .sendInitialValue =
true},
1074 MetricSpec{.name =
"arrow-bytes-expired",
1075 .enabled = arrowAndResourceLimitingMetrics,
1077 .
kind = Kind::UInt64,
1078 .scope = Scope::DPL,
1079 .minPublishInterval = 0,
1080 .maxRefreshLatency = 10000,
1081 .sendInitialValue =
true},
1082 MetricSpec{.name =
"shm-offer-bytes-consumed",
1083 .enabled = arrowAndResourceLimitingMetrics,
1085 .
kind = Kind::UInt64,
1086 .scope = Scope::DPL,
1087 .minPublishInterval = 0,
1088 .maxRefreshLatency = 10000,
1089 .sendInitialValue =
true},
1090 MetricSpec{.name =
"timeslice-offer-number-consumed",
1091 .enabled = arrowAndResourceLimitingMetrics,
1093 .
kind = Kind::UInt64,
1094 .scope = Scope::DPL,
1095 .minPublishInterval = 0,
1096 .maxRefreshLatency = 10000,
1097 .sendInitialValue =
true},
1098 MetricSpec{.name =
"timeslices-expired",
1099 .enabled = arrowAndResourceLimitingMetrics,
1101 .
kind = Kind::UInt64,
1102 .scope = Scope::DPL,
1103 .minPublishInterval = 0,
1104 .maxRefreshLatency = 10000,
1105 .sendInitialValue =
true},
1106 MetricSpec{.name =
"timeslices-started",
1107 .enabled = arrowAndResourceLimitingMetrics,
1109 .
kind = Kind::UInt64,
1110 .scope = Scope::DPL,
1111 .minPublishInterval = 0,
1112 .maxRefreshLatency = 10000,
1113 .sendInitialValue =
true},
1114 MetricSpec{.name =
"timeslices-done",
1115 .enabled = arrowAndResourceLimitingMetrics,
1117 .
kind = Kind::UInt64,
1118 .scope = Scope::DPL,
1119 .minPublishInterval = 0,
1120 .maxRefreshLatency = 10000,
1121 .sendInitialValue =
true},
1122 MetricSpec{.name =
"resources-missing",
1123 .enabled = enableDebugMetrics,
1125 .
kind = Kind::UInt64,
1126 .scope = Scope::DPL,
1127 .minPublishInterval = 1000,
1128 .maxRefreshLatency = 1000,
1129 .sendInitialValue =
true},
1130 MetricSpec{.name =
"resources-insufficient",
1131 .enabled = enableDebugMetrics,
1133 .
kind = Kind::UInt64,
1134 .scope = Scope::DPL,
1135 .minPublishInterval = 1000,
1136 .maxRefreshLatency = 1000,
1137 .sendInitialValue =
true},
1138 MetricSpec{.name =
"resources-satisfactory",
1139 .enabled = enableDebugMetrics,
1141 .
kind = Kind::UInt64,
1142 .scope = Scope::DPL,
1143 .minPublishInterval = 1000,
1144 .maxRefreshLatency = 1000,
1145 .sendInitialValue =
true},
1146 MetricSpec{.name =
"resource-offer-expired",
1147 .enabled = arrowAndResourceLimitingMetrics,
1149 .
kind = Kind::UInt64,
1150 .scope = Scope::DPL,
1151 .minPublishInterval = 0,
1152 .maxRefreshLatency = 10000,
1153 .sendInitialValue =
true}};
1155 for (
auto& metric :
metrics) {
1157 if (spec.name.compare(
"readout-proxy") == 0) {
1158 stats->hasAvailSHMMetric =
true;
1163 stats->registerMetric(metric);
1166 return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStats>(), stats};
1174 sendRelayerMetrics(context.
services(), *stats);
1175 flushMetrics(context.
services(), *stats); },
1178 sendRelayerMetrics(context.
services(), *stats);
1179 flushMetrics(context.
services(), *stats); },
1182 sendRelayerMetrics(context.
services(), *stats);
1183 flushMetrics(context.
services(), *stats); },
1186 flushMetrics(
ref, *stats); },
1195 .
name =
"data-processing-states",
1198 clock_gettime(CLOCK_REALTIME, &now);
1199 uv_update_time(
state.loop);
1200 uint64_t
offset = now.tv_sec * 1000 - uv_now(
state.loop);
1209 states->processCommandQueue(); },
1228 .
name =
"gui-metrics",
1233 monitoring.send({(
int)spec.inputChannels.size(), fmt::format(
"oldest_possible_timeslice/h"), o2::monitoring::Verbosity::Debug});
1234 monitoring.send({(
int)1, fmt::format(
"oldest_possible_timeslice/w"), o2::monitoring::Verbosity::Debug});
1235 monitoring.send({(
int)spec.outputChannels.size(), fmt::format(
"oldest_possible_output/h"), o2::monitoring::Verbosity::Debug});
1236 monitoring.send({(
int)1, fmt::format(
"oldest_possible_output/w"), o2::monitoring::Verbosity::Debug});
1237 return ServiceHandle{TypeIdHelpers::uniqueId<GUIMetrics>(), stats};
1244 auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
1245 for (
size_t ci; ci < spec.outputChannels.size(); ++ci) {
1246 monitoring.send({(uint64_t)oldestPossibleOutput.timeslice.value, fmt::format(
"oldest_possible_output/{}", ci), o2::monitoring::Verbosity::Debug});
1250 monitoring.send({(uint64_t)timeslice, fmt::format(
"oldest_possible_timeslice/{}", channel.value), o2::monitoring::Verbosity::Debug}); },
1258 .
name =
"object-cache",
1261 return ServiceHandle{TypeIdHelpers::uniqueId<ObjectCache>(), cache};
1270 .
name =
"data-processing-context",
1282 .
name =
"data-allocator",
1283 .uniqueId = simpleServiceId<DataAllocator>(),
1286 .
hash = TypeIdHelpers::uniqueId<DataAllocator>(),
1289 .name =
"data-allocator",
1299 std::vector<ServiceSpec> specs{
1329 std::string loadableServicesStr = extraPlugins;
1332 if (loadableServicesStr.empty() ==
false) {
1333 loadableServicesStr +=
",";
1335 loadableServicesStr +=
"O2FrameworkDataTakingSupport:InfoLoggerContext,O2FrameworkDataTakingSupport:InfoLogger";
1338 std::vector<LoadablePlugin> loadablePlugins = {};
1339 char* loadableServicesEnv = getenv(
"DPL_LOAD_SERVICES");
1343 if (loadableServicesEnv) {
1344 if (loadableServicesStr.empty() ==
false) {
1345 loadableServicesStr +=
",";
1347 loadableServicesStr += loadableServicesEnv;
1350 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadablePlugins, specs);
std::vector< std::string > labels
std::vector< OutputRoute > routes
o2::monitoring::tags::Key Key
o2::monitoring::Metric Metric
o2::monitoring::tags::Value Value
#define MONITORING_QUEUE_SIZE
#define O2_BUILTIN_LIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_CRITICAL(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
o2::monitoring::Monitoring Monitoring
decltype(auto) make(const Output &spec, Args... args)
int countDeviceOutputs(bool excludeDPLOrigin=false)
Allow injecting policies on send.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
A text based way of communicating with the driver.
OldestOutputInfo getOldestPossibleOutput() const
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
Defining PrimaryVertex explicitly as messageable.
@ DPL
The channel is a normal input channel.
const DataProcessorLabel suppressDomainInfoLabel
std::pair< std::string, unsigned short > parse_websocket_url(char const *url)
auto decongestionCallback
@ NoTransition
No pending transitions.
@ AVAILABLE_MANAGED_SHM_BASE
@ SHM_OFFER_BYTES_CONSUMED
@ TIMESLICE_NUMBER_STARTED
@ TIMESLICE_NUMBER_EXPIRED
@ DROPPED_INCOMING_MESSAGES
@ ARROW_MESSAGES_DESTROYED
@ TIMESLICE_OFFER_NUMBER_CONSUMED
auto decongestionCallbackOrdered
std::string to_string(gsl::span< T, Size > span)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AsyncTaskId create(AsyncQueue &queue, AsyncTaskSpec spec)
static void post(AsyncQueue &queue, AsyncTask const &task)
static void reset(AsyncQueue &queue)
Reset the queue to its initial state.
An actuatual task to be executed.
static ServiceSpec fairMQDeviceProxy()
static ServiceSpec fairMQBackendSpec()
static ServiceSpec stringBackendSpec()
static ServiceSpec dataRelayer()
static ServiceSpec callbacksSpec()
static ServiceSpec monitoringSpec()
static ServiceSpec dataSender()
static ServiceSpec timesliceIndex()
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static ServiceSpec timingInfoSpec()
static ServiceConfigureCallback noConfiguration()
static ServiceSpec asyncQueue()
static ServiceSpec decongestionSpec()
static ServiceSpec dataProcessorContextSpec()
static ServiceSpec dataProcessingStats()
static std::vector< ServiceSpec > arrowServices()
static ServiceSpec rootFileSpec()
static ServiceSpec objectCache()
static ServiceSpec controlSpec()
static ServiceSpec configurationSpec()
static ServiceSpec ccdbSupportSpec()
static ServiceSpec datatakingContextSpec()
static ServiceSpec guiMetricsSpec()
static ServiceSpec dataProcessingStates()
static ServiceSpec tracingSpec()
static ServiceSpec dataAllocatorSpec()
static ServiceSpec driverClientSpec()
static ServiceSpec streamContextSpec()
static ServiceSpec threadPool(int numWorkers)
static ServiceSpec parallelSpec()
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
int64_t minOnlinePublishInterval
Helper struct to hold statistics about the data processing happening.
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Add
Update the rate of the metric given the amount since the last time.
A label that can be associated to a DataProcessorSpec.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
DeploymentMode deploymentMode
Where we thing this is running.
TimesliceIndex::OldestOutputInfo oldestPossibleOutput
static DeploymentMode deploymentMode()
static bool onlineDeploymentMode()
@true if running online
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
Running state information of a given device.
header::DataOrigin origin
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
Information about the running workflow.
ServiceKind kind
Kind of service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
ServicePostDispatching postDispatching
ServiceKind kind
Kind of service being specified.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"