34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
59#include <fairmq/Parts.h>
60#include <fairmq/Socket.h>
61#include <fairmq/ProgOptions.h>
62#if __has_include(<fairmq/shmem/Message.h>)
63#include <fairmq/shmem/Message.h>
65#include <Configuration/ConfigurationInterface.h>
66#include <Configuration/ConfigurationFactory.h>
67#include <Monitoring/Monitoring.h>
69#include <TClonesArray.h>
71#include <fmt/ostream.h>
79#include <boost/property_tree/json_parser.hpp>
130 return std::all_of(spec.
inputs.cbegin(), spec.
inputs.cend(), [](
InputRoute const& route) ->
bool { return route.matcher.lifetime == Lifetime::Timer; });
135 return (spec.
inputChannels.size() == 1) && (spec.
inputs[0].matcher.lifetime == Lifetime::Timer || spec.
inputs[0].matcher.lifetime == Lifetime::Enumeration);
146 std::string messageOnExpire =
hasOnlyGenerated(spec) ?
"DPL exit transition grace period for source expired. Exiting." : fmt::format(
"DPL exit transition grace period for {} expired. Exiting.",
state.allowedProcessing ==
DeviceState::CalibrationOnly ?
"calibration" :
"data & calibration").c_str();
161 O2_SIGNPOST_START(device, dpid,
"state",
"Starting processing state %d", (
int)newState);
162 state.streaming = newState;
177 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Switching to EndOfStreaming.");
180 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Only calibrations from this point onwards.");
194 return devices[running.
index];
204 : mRunningDevice{running},
205 mConfigRegistry{nullptr},
206 mServiceRegistry{registry}
208 GetConfig()->Subscribe<std::string>(
"dpl", [®istry = mServiceRegistry](
const std::string&
key, std::string
value) {
209 if (
key ==
"cleanup") {
213 int64_t newCleanupCount = std::stoll(
value);
214 if (newCleanupCount <= cleanupCount) {
217 deviceState.cleanupCount.store(newCleanupCount);
218 for (
auto& info : deviceState.inputChannelInfos) {
219 fair::mq::Parts parts;
220 while (info.channel->Receive(parts, 0)) {
221 LOGP(
debug,
"Dropping {} parts", parts.Size());
222 if (parts.Size() == 0) {
230 std::function<
void(
const fair::mq::State)> stateWatcher = [
this, ®istry = mServiceRegistry](
const fair::mq::State
state) ->
void {
235 control.notifyDeviceState(fair::mq::GetStateName(
state));
238 if (deviceState.nextFairMQState.empty() ==
false) {
239 auto state = deviceState.nextFairMQState.back();
241 deviceState.nextFairMQState.pop_back();
246 this->SubscribeToStateChange(
"99-dpl", stateWatcher);
258 mAwakeHandle->data = &
state;
260 LOG(
error) <<
"Unable to initialise subscription";
264 SubscribeToNewTransition(
"dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
265 int res = uv_async_send(wakeHandle);
267 LOG(
error) <<
"Unable to notify subscription";
269 LOG(
debug) <<
"State transition requested";
283 O2_SIGNPOST_START(device, sid,
"run_callback",
"Starting run callback on stream %d", task->id.index);
286 O2_SIGNPOST_END(device, sid,
"run_callback",
"Done processing data for stream %d", task->id.index);
299 using o2::monitoring::Metric;
300 using o2::monitoring::Monitoring;
301 using o2::monitoring::tags::Key;
302 using o2::monitoring::tags::Value;
306 stats.totalConsumedBytes += accumulatedConsumed.
sharedMemory;
307 stats.totalConsumedTimeslices += accumulatedConsumed.
timeslices;
311 dpStats.processCommandQueue();
321 dpStats.processCommandQueue();
324 for (
auto& consumer :
state.offerConsumers) {
325 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
327 state.offerConsumers.clear();
328 quotaEvaluator.handleExpired(reportExpiredOffer);
329 quotaEvaluator.dispose(task->id.index);
330 task->running =
false;
358 O2_SIGNPOST_EVENT_EMIT(sockets, sid,
"socket_state",
"Data pending on socket for channel %{public}s", context->name);
362 O2_SIGNPOST_END(sockets, sid,
"socket_state",
"Socket connected for channel %{public}s", context->name);
364 O2_SIGNPOST_START(sockets, sid,
"socket_state",
"Socket connected for read in context %{public}s", context->name);
365 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &
on_socket_polled);
368 O2_SIGNPOST_START(sockets, sid,
"socket_state",
"Socket connected for write for channel %{public}s", context->name);
376 case UV_DISCONNECT: {
377 O2_SIGNPOST_END(sockets, sid,
"socket_state",
"Socket disconnected in context %{public}s", context->name);
379 case UV_PRIORITIZED: {
380 O2_SIGNPOST_EVENT_EMIT(sockets, sid,
"socket_state",
"Socket prioritized for context %{public}s", context->name);
392 LOGP(fatal,
"Error while polling {}: {}", context->name, status);
397 O2_SIGNPOST_EVENT_EMIT(sockets, sid,
"socket_state",
"Data pending on socket for channel %{public}s", context->name);
399 assert(context->channelInfo);
400 context->channelInfo->readPolled =
true;
403 O2_SIGNPOST_END(sockets, sid,
"socket_state",
"OOB socket connected for channel %{public}s", context->name);
405 O2_SIGNPOST_START(sockets, sid,
"socket_state",
"OOB socket connected for read in context %{public}s", context->name);
408 O2_SIGNPOST_START(sockets, sid,
"socket_state",
"OOB socket connected for write for channel %{public}s", context->name);
412 case UV_DISCONNECT: {
413 O2_SIGNPOST_END(sockets, sid,
"socket_state",
"OOB socket disconnected in context %{public}s", context->name);
416 case UV_PRIORITIZED: {
417 O2_SIGNPOST_EVENT_EMIT(sockets, sid,
"socket_state",
"OOB socket prioritized for context %{public}s", context->name);
438 context.statelessProcess = spec.algorithm.onProcess;
440 context.error = spec.algorithm.onError;
441 context.
initError = spec.algorithm.onInitError;
444 if (configStore ==
nullptr) {
445 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
446 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
447 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
448 configStore->preload();
449 configStore->activate();
452 using boost::property_tree::ptree;
455 for (
auto&
entry : configStore->store()) {
456 std::stringstream ss;
458 if (
entry.second.empty() ==
false) {
459 boost::property_tree::json_parser::write_json(ss,
entry.second,
false);
463 str =
entry.second.get_value<std::string>();
465 std::string configString = fmt::format(
"[CONFIG] {}={} 1 {}",
entry.first,
str, configStore->provenance(
entry.first.c_str())).c_str();
469 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
472 if (context.initError) {
473 context.initErrorHandling = [&errorCallback = context.initError,
486 errorCallback(errorContext);
489 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](
RuntimeErrorRef e) {
504 context.expirationHandlers.clear();
505 context.init = spec.algorithm.onInit;
507 static bool noCatch = getenv(
"O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv(
"O2_NO_CATCHALL_EXCEPTIONS"),
"0");
508 InitContext initContext{*mConfigRegistry, mServiceRegistry};
512 context.statefulProcess = context.init(initContext);
514 if (context.initErrorHandling) {
515 (context.initErrorHandling)(e);
520 context.statefulProcess = context.init(initContext);
521 }
catch (std::exception& ex) {
526 (context.initErrorHandling)(e);
528 (context.initErrorHandling)(e);
533 state.inputChannelInfos.resize(spec.inputChannels.size());
537 int validChannelId = 0;
538 for (
size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
539 auto&
name = spec.inputChannels[ci].name;
540 if (
name.find(spec.channelPrefix +
"from_internal-dpl-clock") == 0) {
545 state.inputChannelInfos[ci].id = {validChannelId++};
550 if (spec.callbacksPolicy.policy !=
nullptr) {
551 InitContext initContext{*mConfigRegistry, mServiceRegistry};
556 auto* options = GetConfig();
557 for (
size_t si = 0; si < mStreams.size(); ++si) {
571 O2_SIGNPOST_END(device, sid,
"signal_state",
"No registry active. Ignoring signal.");
580 while (ri != quotaEvaluator.mOffers.size()) {
581 auto& offer = quotaEvaluator.mOffers[ri];
587 if (offer.valid && offer.sharedMemory != 0) {
588 O2_SIGNPOST_END(device, sid,
"signal_state",
"Memory already offered.");
594 for (
auto& offer : quotaEvaluator.mOffers) {
595 if (offer.valid ==
false) {
598 offer.sharedMemory = 1000000000;
605 O2_SIGNPOST_END(device, sid,
"signal_state",
"Done processing signals.");
608static auto toBeForwardedHeader = [](
void* header) ->
bool {
613 if (header ==
nullptr) {
616 auto sih = o2::header::get<SourceInfoHeader*>(header);
621 auto dih = o2::header::get<DomainInfoHeader*>(header);
626 auto dh = o2::header::get<DataHeader*>(header);
630 auto dph = o2::header::get<DataProcessingHeader*>(header);
637static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
639 std::unique_ptr<fair::mq::Message>& header,
640 std::unique_ptr<fair::mq::Message>& payload,
643 if (header.get() ==
nullptr) {
650 if (payload.get() ==
nullptr && consume ==
true) {
654 header.reset(
nullptr);
658 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
659 if (fdph ==
nullptr) {
660 LOG(error) <<
"Data is missing DataProcessingHeader";
663 auto fdh = o2::header::get<DataHeader*>(header->GetData());
664 if (fdh ==
nullptr) {
665 LOG(error) <<
"Data is missing DataHeader";
672 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
673 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
675 return cachedForwardingChoices.empty() ==
false;
689 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
690 LOG(
debug) <<
"Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
693 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
694 auto& info = proxy.getForwardChannelInfo(
ChannelIndex{fi});
699 O2_SIGNPOST_EVENT_EMIT(async_queue, aid,
"forwardInputsCallback",
"Skipping channel %{public}s because it's not a DPL channel",
705 O2_SIGNPOST_EVENT_EMIT(async_queue, aid,
"forwardInputsCallback",
"Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
706 info.name.c_str(), oldestTimeslice.timeslice.value);
719 std::vector<fair::mq::Parts> forwardedParts;
720 forwardedParts.resize(proxy.getNumForwards());
721 std::vector<ChannelIndex> cachedForwardingChoices{};
723 O2_SIGNPOST_START(forwarding, sid,
"forwardInputs",
"Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
724 slot.index, oldestTimeslice.timeslice.value, copy ?
"with copy" :
"", copy && consume ?
" and " :
"", consume ?
"with consume" :
"");
726 for (
size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
727 auto& messageSet = currentSetOfInputs[ii];
729 if (messageSet.size() == 0) {
732 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
735 cachedForwardingChoices.clear();
737 for (
size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
738 auto& messageSet = currentSetOfInputs[ii];
739 auto& header = messageSet.header(pi);
740 auto& payload = messageSet.payload(pi);
741 auto total = messageSet.getNumberOfPayloads(pi);
743 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
749 if (cachedForwardingChoices.size() > 1) {
752 auto* dh = o2::header::get<DataHeader*>(header->GetData());
753 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
756 for (
auto& cachedForwardingChoice : cachedForwardingChoices) {
757 auto&& newHeader = header->GetTransport()->CreateMessage();
759 fmt::format(
"{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
760 newHeader->Copy(*header);
761 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
763 for (
size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
764 auto&& newPayload = header->GetTransport()->CreateMessage();
765 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
766 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
771 fmt::format(
"{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
772 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
773 for (
size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
774 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
779 O2_SIGNPOST_EVENT_EMIT(forwarding, sid,
"forwardInputs",
"Forwarding %zu messages", forwardedParts.size());
780 for (
int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
781 if (forwardedParts[fi].
Size() == 0) {
785 auto& parts = forwardedParts[fi];
786 if (info.
policy ==
nullptr) {
797 O2_SIGNPOST_EVENT_EMIT(async_queue, aid,
"forwardInputs",
"Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
809 if (infos.empty() ==
false) {
810 std::vector<fair::mq::RegionInfo> toBeNotified;
811 toBeNotified.swap(infos);
812 static bool dummyRead = getenv(
"DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv(
"DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
813 for (
auto const& info : toBeNotified) {
833void DataProcessingDevice::initPollers()
841 if ((context.statefulProcess !=
nullptr) || (context.statelessProcess !=
nullptr)) {
842 for (
auto& [channelName, channel] : GetChannels()) {
844 for (
size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
845 auto& channelSpec = spec.inputChannels[ci];
846 channelInfo = &
state.inputChannelInfos[ci];
847 if (channelSpec.name != channelName) {
850 channelInfo->
channel = &this->GetChannel(channelName, 0);
853 if ((
channelName.rfind(
"from_internal-dpl", 0) == 0) &&
854 (
channelName.rfind(
"from_internal-dpl-aod", 0) != 0) &&
855 (
channelName.rfind(
"from_internal-dpl-ccdb-backend", 0) != 0) &&
856 (
channelName.rfind(
"from_internal-dpl-injected", 0)) != 0) {
857 LOGP(detail,
"{} is an internal channel. Skipping as no input will come from there.", channelName);
861 if (
channelName.rfind(
"from_" + spec.name +
"_", 0) == 0) {
862 LOGP(detail,
"{} is to send data. Not polling.", channelName);
867 LOGP(detail,
"{} is not a DPL socket. Not polling.", channelName);
873 size_t zmq_fd_len =
sizeof(zmq_fd);
876 channel[0].GetSocket().GetOption(
"fd", &zmq_fd, &zmq_fd_len);
881 LOGP(detail,
"Polling socket for {}", channelName);
884 pCtx->loop =
state.loop;
886 pCtx->state = &
state;
888 assert(channelInfo !=
nullptr);
889 pCtx->channelInfo = channelInfo;
890 pCtx->socket = &channel[0].GetSocket();
893 uv_poll_init(
state.loop, poller, zmq_fd);
895 LOGP(detail,
"{} is an out of band channel.", channelName);
896 state.activeOutOfBandPollers.push_back(poller);
899 state.activeInputPollers.push_back(poller);
905 if (
state.activeInputPollers.empty() &&
906 state.activeOutOfBandPollers.empty() &&
907 state.activeTimers.empty() &&
908 state.activeSignals.empty()) {
912 if (
state.inputChannelInfos.empty()) {
913 LOGP(detail,
"No input channels. Setting exit transition timeout to 0.");
914 deviceContext.exitTransitionTimeout = 0;
916 for (
auto& [channelName, channel] : GetChannels()) {
917 if (
channelName.rfind(spec.channelPrefix +
"from_internal-dpl", 0) == 0) {
918 LOGP(detail,
"{} is an internal channel. Not polling.", channelName);
921 if (
channelName.rfind(spec.channelPrefix +
"from_" + spec.name +
"_", 0) == 0) {
922 LOGP(detail,
"{} is an out of band channel. Not polling for output.", channelName);
927 size_t zmq_fd_len =
sizeof(zmq_fd);
930 channel[0].GetSocket().GetOption(
"fd", &zmq_fd, &zmq_fd_len);
932 LOGP(
error,
"Cannot get file descriptor for channel {}", channelName);
935 LOG(detail) <<
"Polling socket for " << channel[0].GetName();
939 pCtx->loop =
state.loop;
941 pCtx->state = &
state;
945 uv_poll_init(
state.loop, poller, zmq_fd);
946 state.activeOutputPollers.push_back(poller);
950 LOGP(detail,
"This is a fake device so we exit after the first iteration.");
951 deviceContext.exitTransitionTimeout = 0;
957 uv_timer_init(
state.loop, timer);
958 timer->data = &
state;
959 uv_update_time(
state.loop);
961 state.activeTimers.push_back(timer);
965void DataProcessingDevice::startPollers()
971 for (
auto* poller :
state.activeInputPollers) {
973 O2_SIGNPOST_START(device, sid,
"socket_state",
"Input socket waiting for connection.");
977 for (
auto& poller :
state.activeOutOfBandPollers) {
981 for (
auto* poller :
state.activeOutputPollers) {
983 O2_SIGNPOST_START(device, sid,
"socket_state",
"Output socket waiting for connection.");
990 uv_timer_init(
state.loop, deviceContext.gracePeriodTimer);
993 deviceContext.dataProcessingGracePeriodTimer->data =
new ServiceRegistryRef(mServiceRegistry);
994 uv_timer_init(
state.loop, deviceContext.dataProcessingGracePeriodTimer);
997void DataProcessingDevice::stopPollers()
1002 LOGP(detail,
"Stopping {} input pollers",
state.activeInputPollers.size());
1003 for (
auto* poller :
state.activeInputPollers) {
1005 O2_SIGNPOST_END(device, sid,
"socket_state",
"Output socket closed.");
1006 uv_poll_stop(poller);
1009 LOGP(detail,
"Stopping {} out of band pollers",
state.activeOutOfBandPollers.size());
1010 for (
auto* poller :
state.activeOutOfBandPollers) {
1011 uv_poll_stop(poller);
1014 LOGP(detail,
"Stopping {} output pollers",
state.activeOutOfBandPollers.size());
1015 for (
auto* poller :
state.activeOutputPollers) {
1017 O2_SIGNPOST_END(device, sid,
"socket_state",
"Output socket closed.");
1018 uv_poll_stop(poller);
1022 uv_timer_stop(deviceContext.gracePeriodTimer);
1024 free(deviceContext.gracePeriodTimer);
1025 deviceContext.gracePeriodTimer =
nullptr;
1027 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1029 free(deviceContext.dataProcessingGracePeriodTimer);
1030 deviceContext.dataProcessingGracePeriodTimer =
nullptr;
1045 for (
auto&
di : distinct) {
1046 auto& route = spec.inputs[
di];
1047 if (route.configurator.has_value() ==
false) {
1052 .
name = route.configurator->name,
1054 .lifetime = route.matcher.lifetime,
1055 .creator = route.configurator->creatorConfigurator(
state, mServiceRegistry, *mConfigRegistry),
1056 .checker = route.configurator->danglingConfigurator(
state, *mConfigRegistry),
1057 .handler = route.configurator->expirationConfigurator(
state, *mConfigRegistry)};
1058 context.expirationHandlers.emplace_back(std::move(handler));
1061 if (
state.awakeMainThread ==
nullptr) {
1067 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>(
"expected-region-callbacks"));
1068 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>(
"exit-transition-timeout"));
1069 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>(
"data-processing-timeout"));
1071 for (
auto& channel : GetChannels()) {
1072 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1073 ®istry = mServiceRegistry,
1074 &pendingRegionInfos = mPendingRegionInfos,
1075 ®ionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1076 std::lock_guard<std::mutex> lock(regionInfoMutex);
1077 LOG(detail) <<
">>> Region info event" << info.event;
1078 LOG(detail) <<
"id: " << info.id;
1079 LOG(detail) <<
"ptr: " << info.ptr;
1080 LOG(detail) <<
"size: " << info.size;
1081 LOG(detail) <<
"flags: " << info.flags;
1084 pendingRegionInfos.push_back(info);
1097 if (deviceContext.sigusr1Handle ==
nullptr) {
1099 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1100 uv_signal_init(
state.loop, deviceContext.sigusr1Handle);
1104 for (
auto& handle :
state.activeSignals) {
1105 handle->data = &
state;
1108 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1111 DataProcessingDevice::initPollers();
1119 LOG(
error) <<
"DataProcessor " <<
state.lastActiveDataProcessor.load()->spec->name <<
" was unexpectedly active";
1131 O2_SIGNPOST_END(device, cid,
"InitTask",
"Exiting InitTask callback waiting for the remaining region callbacks.");
1133 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](
DeviceContext& deviceContext) {
1134 std::lock_guard<std::mutex> lock(mutex);
1135 return (pendingRegionInfos.empty() ==
false) || deviceContext.expectedRegionCallbacks > 0;
1142 while (hasPendingEvents(deviceContext)) {
1144 uv_run(
state.loop, UV_RUN_ONCE);
1148 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1152 O2_SIGNPOST_END(device, cid,
"InitTask",
"Done waiting for registration events.");
1159 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>(
"timeframes-rate-limit"));
1168 if (enableRateLimiting ==
false && spec.name.find(
"internal-dpl-injected-dummy-sink") != std::string::npos) {
1171 if (enableRateLimiting) {
1172 for (
auto& spec : spec.outputs) {
1173 if (spec.matcher.binding.value ==
"dpl-summary") {
1180 context.
registry = &mServiceRegistry;
1183 if (context.
error !=
nullptr) {
1197 errorCallback(errorContext);
1211 switch (deviceContext.processingPolicies.
error) {
1222 auto decideEarlyForward = [&context, &deviceContext, &spec,
this]() ->
bool {
1226 bool onlyConditions =
true;
1227 bool overriddenEarlyForward =
false;
1228 for (
auto& forwarded : spec.forwards) {
1229 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1230 onlyConditions =
false;
1232#if !__has_include(<fairmq/shmem/Message.h>)
1235 overriddenEarlyForward =
true;
1242 overriddenEarlyForward =
true;
1246 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1248 overriddenEarlyForward =
true;
1253 if (!overriddenEarlyForward && onlyConditions) {
1255 LOG(detail) <<
"Enabling early forwarding because only conditions to be forwarded";
1257 return canForwardEarly;
1269 state.quitRequested =
false;
1272 for (
auto& info :
state.inputChannelInfos) {
1284 for (
size_t i = 0;
i < mStreams.size(); ++
i) {
1287 context.preStartStreamCallbacks(streamRef);
1289 }
catch (std::exception& e) {
1290 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid,
"PreRun",
"Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1291 O2_SIGNPOST_END(device, cid,
"PreRun",
"Exiting PreRun due to exception thrown.");
1295 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid,
"PreRun",
"Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1296 O2_SIGNPOST_END(device, cid,
"PreRun",
"Exiting PreRun due to exception thrown.");
1299 O2_SIGNPOST_END(device, cid,
"PreRun",
"Unknown exception being thrown. Rethrowing.");
1307 using o2::monitoring::Metric;
1308 using o2::monitoring::Monitoring;
1309 using o2::monitoring::tags::Key;
1310 using o2::monitoring::tags::Value;
1313 monitoring.send(
Metric{(uint64_t)1,
"device_state"}.addTag(Key::Subsystem, Value::DPL));
1321 using o2::monitoring::Metric;
1322 using o2::monitoring::Monitoring;
1323 using o2::monitoring::tags::Key;
1324 using o2::monitoring::tags::Value;
1327 monitoring.send(
Metric{(uint64_t)0,
"device_state"}.addTag(Key::Subsystem, Value::DPL));
1346 return state.transitionHandling;
1357 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1358 uv_update_time(
state.loop);
1359 O2_SIGNPOST_EVENT_EMIT(calibration, lid,
"timer_setup",
"Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1360 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer,
on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1364 uv_update_time(
state.loop);
1365 O2_SIGNPOST_EVENT_EMIT(calibration, lid,
"timer_setup",
"Starting %d s timer for exitTransitionTimeout.",
1366 deviceContext.exitTransitionTimeout);
1369 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1374 "New state requested. Waiting for %d seconds before %{public}s",
1376 onlyGenerated ?
"dropping remaining input and switching to READY state." :
"switching to READY state.");
1381 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state requested. No timeout set, quitting immediately as per --completion-policy");
1383 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state requested. No timeout set, switching to READY state immediately");
1385 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, quitting immediately as per --completion-policy");
1387 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, switching to READY immediately.");
1398 bool firstLoop =
true;
1400 O2_SIGNPOST_START(device, lid,
"device_state",
"First iteration of the device loop");
1402 bool dplEnableMultithreding = getenv(
"DPL_THREADPOOL_SIZE") !=
nullptr;
1403 if (dplEnableMultithreding) {
1404 setenv(
"UV_THREADPOOL_SIZE",
"1", 1);
1408 if (
state.nextFairMQState.empty() ==
false) {
1409 (
void)this->ChangeState(
state.nextFairMQState.back());
1410 state.nextFairMQState.pop_back();
1415 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1428 state.lastActiveDataProcessor.compare_exchange_strong(lastActive,
nullptr);
1430 auto shouldNotWait = (lastActive !=
nullptr &&
1434 shouldNotWait =
true;
1437 if (lastActive !=
nullptr) {
1440 if (NewStatePending()) {
1442 shouldNotWait =
true;
1448 O2_SIGNPOST_EVENT_EMIT(device, lid,
"run_loop",
"State transition requested and we are now in Idle. We can consider it to be completed.");
1451 if (
state.severityStack.empty() ==
false) {
1452 fair::Logger::SetConsoleSeverity((fair::Severity)
state.severityStack.back());
1453 state.severityStack.pop_back();
1459 state.firedTimers.clear();
1461 state.severityStack.push_back((
int)fair::Logger::GetConsoleSeverity());
1462 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1469 O2_SIGNPOST_START(device, lid,
"run_loop",
"Dropping message from slot %" PRIu64
". Forwarding as needed.", (uint64_t)slot.index);
1477 forwardInputs(registry, slot, dropped, oldestOutputInfo,
false,
true);
1482 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1484 if (shouldNotWait ==
false) {
1488 O2_SIGNPOST_END(device, lid,
"run_loop",
"Run loop completed. %{}s", shouldNotWait ?
"Will immediately schedule a new one" :
"Waiting for next event.");
1489 uv_run(
state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1491 if ((
state.loopReason &
state.tracingFlags) != 0) {
1492 state.severityStack.push_back((
int)fair::Logger::GetConsoleSeverity());
1493 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1494 }
else if (
state.severityStack.empty() ==
false) {
1495 fair::Logger::SetConsoleSeverity((fair::Severity)
state.severityStack.back());
1496 state.severityStack.pop_back();
1501 O2_SIGNPOST_EVENT_EMIT(device, lid,
"run_loop",
"Out of band activity detected. Rescanning everything.");
1505 if (!
state.pendingOffers.empty()) {
1506 O2_SIGNPOST_EVENT_EMIT(device, lid,
"run_loop",
"Pending %" PRIu64
" offers. updating the ComputingQuotaEvaluator.", (uint64_t)
state.pendingOffers.size());
1518 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1522 assert(mStreams.size() == mHandles.size());
1525 for (
size_t ti = 0; ti < mStreams.size(); ti++) {
1526 auto& taskInfo = mStreams[ti];
1527 if (taskInfo.running) {
1531 streamRef.index = ti;
1533 using o2::monitoring::Metric;
1534 using o2::monitoring::Monitoring;
1535 using o2::monitoring::tags::Key;
1536 using o2::monitoring::tags::Value;
1539 if (streamRef.index != -1) {
1542 uv_work_t& handle = mHandles[streamRef.index];
1544 handle.data = &mStreams[streamRef.index];
1552 dpStats.processCommandQueue();
1562 struct SchedulingStats {
1563 std::atomic<size_t> lastScheduled = 0;
1564 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1565 std::atomic<size_t> numberOfUnscheduled = 0;
1566 std::atomic<size_t> numberOfScheduled = 0;
1568 static SchedulingStats schedulingStats;
1573 stream.registry = &mServiceRegistry;
1574 schedulingStats.lastScheduled = uv_now(
state.loop);
1575 schedulingStats.numberOfScheduled++;
1576 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1577 O2_SIGNPOST_EVENT_EMIT(scheduling, sid,
"Run",
"Enough resources to schedule computation on stream %d", streamRef.index);
1578 if (dplEnableMultithreding) [[unlikely]] {
1586 if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1587 (uv_now(
state.loop) - schedulingStats.lastScheduled) > 30000) {
1589 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1590 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1591 schedulingStats.lastScheduled.load());
1594 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1595 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1596 schedulingStats.lastScheduled.load());
1598 schedulingStats.numberOfUnscheduled++;
1599 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1606 O2_SIGNPOST_END(device, lid,
"run_loop",
"Run loop completed. Transition handling state %d.", (
int)
state.transitionHandling);
1609 for (
size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1610 auto& info =
state.inputChannelInfos[ci];
1611 info.parts.fParts.clear();
1622 O2_SIGNPOST_START(device, dpid,
"do_prepare",
"Starting DataProcessorContext::doPrepare.");
1640 context.allDone = std::any_of(
state.inputChannelInfos.begin(),
state.inputChannelInfos.end(), [cid](
const auto& info) {
1642 O2_SIGNPOST_EVENT_EMIT(device, cid,
"do_prepare",
"Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1643 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ?
" (non DPL)" :
""), info.parts.fParts.size(), (int)info.state);
1645 O2_SIGNPOST_EVENT_EMIT(device, cid,
"do_prepare",
"External channel %d is in state %d.", info.id.value, (int)info.state);
1650 O2_SIGNPOST_EVENT_EMIT(device, dpid,
"do_prepare",
"Processing %zu input channels.", spec.inputChannels.size());
1653 static std::vector<int> pollOrder;
1654 pollOrder.resize(
state.inputChannelInfos.size());
1655 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1656 std::sort(pollOrder.begin(), pollOrder.end(), [&infos =
state.inputChannelInfos](
int a,
int b) {
1657 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1661 if (pollOrder.empty()) {
1662 O2_SIGNPOST_END(device, dpid,
"do_prepare",
"Nothing to poll. Waiting for next iteration.");
1665 auto currentOldest =
state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1666 auto currentNewest =
state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1667 auto delta = currentNewest.value - currentOldest.value;
1668 O2_SIGNPOST_EVENT_EMIT(device, dpid,
"do_prepare",
"Oldest possible timeframe range %" PRIu64
" => %" PRIu64
" delta %" PRIu64,
1669 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1670 auto& infos =
state.inputChannelInfos;
1672 if (context.balancingInputs) {
1674 static uint64_t ahead = getenv(
"DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv(
"DPL_MAX_CHANNEL_AHEAD")) :
std::
max(8,
std::
min(pipelineLength - 48, pipelineLength / 2));
1675 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](
int a) ->
bool {
1676 return infos[a].oldestForChannel.value > limitNew;
1678 for (
auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1679 const auto& channelInfo =
state.inputChannelInfos[*it];
1685 bool shouldBeRunning = it < newEnd;
1686 if (running != shouldBeRunning) {
1687 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &
on_socket_polled);
1693 pollOrder.erase(newEnd, pollOrder.end());
1695 O2_SIGNPOST_END(device, dpid,
"do_prepare",
"%zu channels pass the channel inbalance balance check.", pollOrder.size());
1697 for (
auto sci : pollOrder) {
1698 auto& info =
state.inputChannelInfos[sci];
1699 auto& channelSpec = spec.inputChannels[sci];
1701 O2_SIGNPOST_START(device, cid,
"channels",
"Processing channel %s", channelSpec.name.c_str());
1704 context.allDone =
false;
1709 if (info.parts.Size()) {
1712 O2_SIGNPOST_END(device, cid,
"channels",
"Flushing channel %s which is in state %d and has %zu parts still pending.",
1713 channelSpec.name.c_str(), (
int)info.state, info.parts.Size());
1716 if (info.
channel ==
nullptr) {
1717 O2_SIGNPOST_END(device, cid,
"channels",
"Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1718 channelSpec.name.c_str(), (
int)info.state, info.parts.Size());
1723 O2_SIGNPOST_END(device, cid,
"channels",
"Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1724 channelSpec.name.c_str(), (
int)info.state, info.parts.Size());
1727 auto& socket = info.
channel->GetSocket();
1732 if (info.hasPendingEvents == 0) {
1733 socket.Events(&info.hasPendingEvents);
1735 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1736 O2_SIGNPOST_END(device, cid,
"channels",
"No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1742 info.readPolled =
false;
1751 bool newMessages =
false;
1753 O2_SIGNPOST_EVENT_EMIT(device, cid,
"channels",
"Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1754 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1755 if (info.parts.Size() < 64) {
1756 fair::mq::Parts parts;
1757 info.
channel->Receive(parts, 0);
1759 O2_SIGNPOST_EVENT_EMIT(device, cid,
"channels",
"Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1761 for (
auto&& part : parts) {
1762 info.parts.fParts.emplace_back(std::move(part));
1764 newMessages |=
true;
1767 if (info.parts.Size() >= 0) {
1779 socket.Events(&info.hasPendingEvents);
1780 if (info.hasPendingEvents) {
1781 info.readPolled =
false;
1784 state.lastActiveDataProcessor.store(&context);
1787 O2_SIGNPOST_END(device, cid,
"channels",
"Done processing channel %{public}s (%d).",
1788 channelSpec.name.c_str(), info.id.value);
1803 context.completed.clear();
1804 context.completed.reserve(16);
1806 state.lastActiveDataProcessor.store(&context);
1810 context.preDanglingCallbacks(danglingContext);
1811 if (
state.lastActiveDataProcessor.load() ==
nullptr) {
1814 auto activity =
ref.get<
DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry,
true);
1815 if (activity.expiredSlots > 0) {
1816 state.lastActiveDataProcessor = &context;
1819 context.completed.clear();
1821 state.lastActiveDataProcessor = &context;
1824 context.postDanglingCallbacks(danglingContext);
1832 state.lastActiveDataProcessor = &context;
1855 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1856 timingInfo.tfCounter = -1;
1857 timingInfo.firstTForbit = -1;
1859 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1860 O2_SIGNPOST_EVENT_EMIT(calibration, dpid,
"calibration",
"TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1864 context.preEOSCallbacks(eosContext);
1868 streamContext.postEOSCallbacks(eosContext);
1869 context.postEOSCallbacks(eosContext);
1871 for (
auto& channel : spec.outputChannels) {
1872 O2_SIGNPOST_EVENT_EMIT(device, dpid,
"state",
"Sending end of stream to %{public}s.", channel.name.c_str());
1879 if (shouldProcess) {
1880 state.lastActiveDataProcessor = &context;
1884 for (
auto& poller :
state.activeOutputPollers) {
1885 uv_poll_stop(poller);
1893 for (
auto& poller :
state.activeOutputPollers) {
1894 uv_poll_stop(poller);
1910 if (deviceContext.sigusr1Handle) {
1916 handle->data =
nullptr;
1945 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1950 auto& parts = info.
parts;
1953 std::vector<InputInfo> results;
1955 results.reserve(parts.Size() / 2);
1956 size_t nTotalPayloads = 0;
1960 if (
type != InputType::Invalid &&
length > 1) {
1961 nTotalPayloads +=
length - 1;
1965 for (
size_t pi = 0; pi < parts.Size(); pi += 2) {
1966 auto* headerData = parts.At(pi)->GetData();
1967 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1969 O2_SIGNPOST_EVENT_EMIT(device, cid,
"handle_data",
"Got SourceInfoHeader with state %d", (
int)sih->state);
1970 info.
state = sih->state;
1971 insertInputInfo(pi, 2, InputType::SourceInfo, info.
id);
1972 state.lastActiveDataProcessor = &context;
1975 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1977 O2_SIGNPOST_EVENT_EMIT(device, cid,
"handle_data",
"Got DomainInfoHeader with oldestPossibleTimeslice %d", (
int)dih->oldestPossibleTimeslice);
1978 insertInputInfo(pi, 2, InputType::DomainInfo, info.
id);
1979 state.lastActiveDataProcessor = &context;
1982 auto dh = o2::header::get<DataHeader*>(headerData);
1984 insertInputInfo(pi, 0, InputType::Invalid, info.
id);
1988 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1989 insertInputInfo(pi, 0, InputType::Invalid, info.
id);
1993 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1998 O2_SIGNPOST_START(parts,
pid,
"parts",
"Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1999 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
2001 insertInputInfo(pi, 2, InputType::Invalid, info.
id);
2005 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2008 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.
id);
2009 pi += dh->splitPayloadParts - 1;
2015 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2016 if (finalSplitPayloadIndex > parts.Size()) {
2018 insertInputInfo(pi, 0, InputType::Invalid, info.
id);
2021 insertInputInfo(pi, 2, InputType::Data, info.
id);
2022 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
2023 insertInputInfo(pi + 2, 2, InputType::Data, info.
id);
2027 if (results.size() + nTotalPayloads != parts.Size()) {
2028 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid,
"handle_data",
"inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
2029 return std::nullopt;
2034 auto reportError = [
ref](
const char*
message) {
2039 auto handleValidMessages = [&info,
ref, &reportError](std::vector<InputInfo>
const& inputInfos) {
2043 auto& parts = info.
parts;
2046 bool hasBackpressure =
false;
2047 size_t minBackpressureTimeslice = -1;
2049 size_t oldestPossibleTimeslice = -1;
2050 static std::vector<int> ordering;
2052 ordering.resize(inputInfos.size());
2053 std::iota(ordering.begin(), ordering.end(), 0);
2055 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](
int const&
a,
int const&
b) {
2056 auto const& ai = inputInfos[a];
2057 auto const& bi = inputInfos[b];
2058 if (ai.type != bi.type) {
2059 return ai.type < bi.type;
2061 return ai.position < bi.position;
2063 for (
size_t ii = 0; ii < inputInfos.size(); ++ii) {
2064 auto const& input = inputInfos[ordering[ii]];
2065 switch (input.type) {
2066 case InputType::Data: {
2068 auto headerIndex = input.position;
2070 auto nPayloadsPerHeader = 0;
2071 if (input.size > 2) {
2073 nMessages = input.size;
2074 nPayloadsPerHeader = nMessages - 1;
2077 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2078 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2079 nPayloadsPerHeader = 1;
2080 ii += (nMessages / 2) - 1;
2084 O2_SIGNPOST_EVENT_EMIT(async_queue, cid,
"onDrop",
"Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2085 slot.
index, oldestOutputInfo.timeslice.value);
2092 forwardInputs(
ref, slot, dropped, oldestOutputInfo,
false,
true);
2094 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2095 &parts.At(headerIndex),
2100 switch (relayed.type) {
2103 LOGP(alarm,
"Backpressure on channel {}. Waiting.", info.
channel->GetName());
2104 auto& monitoring =
ref.get<o2::monitoring::Monitoring>();
2105 monitoring.send(o2::monitoring::Metric{1, fmt::format(
"backpressure_{}", info.
channel->GetName())});
2109 policy.backpressure(info);
2110 hasBackpressure =
true;
2111 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2117 LOGP(info,
"Back to normal on channel {}.", info.
channel->GetName());
2118 auto& monitoring =
ref.get<o2::monitoring::Monitoring>();
2119 monitoring.send(o2::monitoring::Metric{0, fmt::format(
"backpressure_{}", info.
channel->GetName())});
2126 case InputType::SourceInfo: {
2127 LOGP(detail,
"Received SourceInfo");
2129 state.lastActiveDataProcessor = &context;
2130 auto headerIndex = input.position;
2131 auto payloadIndex = input.position + 1;
2132 assert(payloadIndex < parts.Size());
2135 parts.At(headerIndex).reset(
nullptr);
2136 parts.At(payloadIndex).reset(
nullptr);
2143 case InputType::DomainInfo: {
2147 state.lastActiveDataProcessor = &context;
2148 auto headerIndex = input.position;
2149 auto payloadIndex = input.position + 1;
2150 assert(payloadIndex < parts.Size());
2154 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2155 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2158 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2159 LOGP(
debug,
"Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.
id.
value);
2160 parts.At(headerIndex).reset(
nullptr);
2161 parts.At(payloadIndex).reset(
nullptr);
2163 case InputType::Invalid: {
2164 reportError(
"Invalid part found.");
2170 if (oldestPossibleTimeslice != (
size_t)-1) {
2173 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.
id);
2175 state.lastActiveDataProcessor = &context;
2177 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](
auto&
msg) ->
bool { return msg.get() == nullptr; });
2178 parts.fParts.erase(it, parts.end());
2179 if (parts.fParts.size()) {
2180 LOG(
debug) << parts.fParts.size() <<
" messages backpressured";
2192 auto inputTypes = getInputTypes();
2193 if (
bool(inputTypes) ==
false) {
2194 reportError(
"Parts should come in couples. Dropping it.");
2197 handleValidMessages(*inputTypes);
2203struct InputLatency {
2208auto calculateInputRecordLatency(
InputRecord const& record, uint64_t currentTime) -> InputLatency
2212 for (
auto& item : record) {
2213 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2214 if (header ==
nullptr) {
2217 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2218 if (partLatency < 0) {
2221 result.minLatency = std::min(
result.minLatency, (uint64_t)partLatency);
2222 result.maxLatency = std::max(
result.maxLatency, (uint64_t)partLatency);
2227auto calculateTotalInputRecordSize(
InputRecord const& record) ->
int
2229 size_t totalInputSize = 0;
2230 for (
auto& item : record) {
2231 auto* header = o2::header::get<DataHeader*>(item.header);
2232 if (header ==
nullptr) {
2235 totalInputSize += header->payloadSize;
2237 return totalInputSize;
2240template <
typename T>
2241void update_maximum(std::atomic<T>& maximum_value, T
const&
value)
noexcept
2243 T prev_value = maximum_value;
2244 while (prev_value <
value &&
2245 !maximum_value.compare_exchange_weak(prev_value,
value)) {
2253 LOGP(
debug,
"DataProcessingDevice::tryDispatchComputation");
2258 std::vector<MessageSet> currentSetOfInputs;
2261 auto getInputSpan = [
ref, ¤tSetOfInputs](
TimesliceSlot slot,
bool consume =
true) {
2266 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2268 auto getter = [¤tSetOfInputs](
size_t i,
size_t partindex) ->
DataRef {
2269 if (currentSetOfInputs[
i].getNumberOfPairs() > partindex) {
2270 const char* headerptr =
nullptr;
2271 const char* payloadptr =
nullptr;
2272 size_t payloadSize = 0;
2278 auto const& headerMsg = currentSetOfInputs[
i].associatedHeader(partindex);
2279 auto const& payloadMsg = currentSetOfInputs[
i].associatedPayload(partindex);
2280 headerptr =
static_cast<char const*
>(headerMsg->GetData());
2281 payloadptr = payloadMsg ?
static_cast<char const*
>(payloadMsg->GetData()) :
nullptr;
2282 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2283 return DataRef{
nullptr, headerptr, payloadptr, payloadSize};
2287 auto nofPartsGetter = [¤tSetOfInputs](
size_t i) ->
size_t {
2288 return currentSetOfInputs[
i].getNumberOfPairs();
2290#if __has_include(<fairmq/shmem/Message.h>)
2291 auto refCountGetter = [¤tSetOfInputs](
size_t idx) ->
int {
2292 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*currentSetOfInputs[idx].header(0));
2293 return header.GetRefCount();
2296 std::function<
int(
size_t)> refCountGetter =
nullptr;
2298 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.
size()};
2313 auto timeslice = relayer.getTimesliceForSlot(
i);
2315 timingInfo.timeslice = timeslice.value;
2325 auto timeslice = relayer.getTimesliceForSlot(
i);
2327 timingInfo.globalRunNumberChanged = !
TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2329 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2333 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2341 assert(record.size() == currentSetOfInputs.size());
2342 for (
size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2346 DataRef input = record.getByPos(ii);
2350 if (input.
header ==
nullptr) {
2354 currentSetOfInputs[ii].clear();
2365 for (
size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2366 DataRef input = record.getByPos(pi);
2367 if (input.
header ==
nullptr) {
2370 auto sih = o2::header::get<SourceInfoHeader*>(input.
header);
2375 auto dh = o2::header::get<DataHeader*>(input.
header);
2385 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2388 pi += dh->splitPayloadParts - 1;
2390 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2396 if (completed.empty() ==
true) {
2397 LOGP(
debug,
"No computations available for dispatching.");
2404 std::atomic_thread_fence(std::memory_order_release);
2405 char relayerSlotState[1024];
2407 char*
buffer = relayerSlotState + written;
2408 for (
size_t ai = 0; ai != record.size(); ai++) {
2409 buffer[ai] = record.isValid(ai) ?
'3' :
'0';
2411 buffer[record.size()] = 0;
2413 .size = (
int)(record.size() +
buffer - relayerSlotState),
2414 .
data = relayerSlotState});
2415 uint64_t tEnd = uv_hrtime();
2417 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2425 auto latency = calculateInputRecordLatency(record, tStartMilli);
2428 static int count = 0;
2435 std::atomic_thread_fence(std::memory_order_release);
2436 char relayerSlotState[1024];
2438 char*
buffer = strchr(relayerSlotState,
' ') + 1;
2439 for (
size_t ai = 0; ai != record.size(); ai++) {
2440 buffer[ai] = record.isValid(ai) ?
'2' :
'0';
2442 buffer[record.size()] = 0;
2460 switch (spec.completionPolicy.order) {
2462 std::sort(completed.begin(), completed.end(), [](
auto const&
a,
auto const&
b) { return a.timeslice.value < b.timeslice.value; });
2465 std::sort(completed.begin(), completed.end(), [](
auto const&
a,
auto const&
b) { return a.slot.index < b.slot.index; });
2472 for (
auto action : completed) {
2474 O2_SIGNPOST_START(device, aid,
"device",
"Processing action on slot %lu for action %{public}s", action.
slot.
index, fmt::format(
"{}", action.
op).c_str());
2498 dpContext.preProcessingCallbacks(processContext);
2501 context.postDispatchingCallbacks(processContext);
2502 if (spec.forwards.empty() ==
false) {
2504 forwardInputs(
ref, action.
slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(),
false);
2505 O2_SIGNPOST_END(device, aid,
"device",
"Forwarding inputs consume: %d.",
false);
2513 bool hasForwards = spec.forwards.empty() ==
false;
2516 if (context.canForwardEarly && hasForwards && consumeSomething) {
2517 O2_SIGNPOST_EVENT_EMIT(device, aid,
"device",
"Early forwainding: %{public}s.", fmt::format(
"{}", action.
op).c_str());
2521 markInputsAsDone(action.
slot);
2523 uint64_t tStart = uv_hrtime();
2525 preUpdateStats(action, record, tStart);
2527 static bool noCatch = getenv(
"O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv(
"O2_NO_CATCHALL_EXCEPTIONS"),
"0");
2535 switch (action.
op) {
2546 if (
state.quitRequested ==
false) {
2550 streamContext.preProcessingCallbacks(processContext);
2556 if (context.statefulProcess && shouldProcess(action)) {
2560 (context.statefulProcess)(processContext);
2562 }
else if (context.statelessProcess && shouldProcess(action)) {
2564 (context.statelessProcess)(processContext);
2566 }
else if (context.statelessProcess || context.statefulProcess) {
2569 O2_SIGNPOST_EVENT_EMIT(device, pcid,
"device",
"No processing callback provided. Switching to %{public}s.",
"Idle");
2572 if (shouldProcess(action)) {
2574 if (timingInfo.globalRunNumberChanged) {
2575 context.lastRunNumberProcessed = timingInfo.runNumber;
2592 streamContext.finaliseOutputsCallbacks(processContext);
2598 streamContext.postProcessingCallbacks(processContext);
2604 state.severityStack.push_back((
int)fair::Logger::GetConsoleSeverity());
2605 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2611 (context.errorHandling)(e, record);
2616 }
catch (std::exception& ex) {
2621 (context.errorHandling)(e, record);
2623 (context.errorHandling)(e, record);
2626 if (
state.severityStack.empty() ==
false) {
2627 fair::Logger::SetConsoleSeverity((fair::Severity)
state.severityStack.back());
2628 state.severityStack.pop_back();
2631 postUpdateStats(action, record, tStart, tStartMilli);
2635 cleanupRecord(record);
2636 context.postDispatchingCallbacks(processContext);
2639 if ((context.canForwardEarly ==
false) && hasForwards && consumeSomething) {
2644 context.postForwardingCallbacks(processContext);
2646 cleanTimers(action.
slot, record);
2648 O2_SIGNPOST_END(device, aid,
"device",
"Done processing action on slot %lu for action %{public}s", action.
slot.
index, fmt::format(
"{}", action.
op).c_str());
2650 O2_SIGNPOST_END(device, sid,
"device",
"Start processing ready actions");
2654 LOGP(detail,
"Broadcasting end of stream");
2655 for (
auto& channel : spec.outputChannels) {
2678 cfg.getRecursive(
name);
2679 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2680 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg,
name));
2681 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2682 configStore->preload();
2683 configStore->activate();
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_LOG_ENABLED(log)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
constexpr uint32_t runtime_hash(char const *str)
o2::monitoring::Monitoring Monitoring
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
void error(const char *msg)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
virtual fair::mq::Device * device()=0
bool active() const
Check if service of type T is currently active.
GLuint const GLchar * name
GLboolean GLboolean GLboolean b
GLsizei const GLfloat * value
GLint GLint GLsizei GLint GLenum GLenum type
GLuint GLsizei GLsizei * length
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
GLboolean GLboolean GLboolean GLboolean a
GLbitfield GLuint64 timeout
Defining PrimaryVertex explicitly as messageable.
auto decongestionCallbackLate
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
auto switchState(ServiceRegistryRef &ref, StreamingState newState) -> void
void on_awake_main_thread(uv_async_t *handle)
@ SHM_OFFER_BYTES_CONSUMED
@ TIMESLICE_NUMBER_EXPIRED
@ TIMESLICE_OFFER_NUMBER_CONSUMED
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
TransitionHandlingState updateStateTransition(ServiceRegistryRef &ref, ProcessingPolicies const &policies)
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
void updateStats(CommandSpec cmd)
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
ServiceRegistry * registry
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
AlgorithmSpec::ErrorCallback error
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
uv_signal_t * sigusr1Handle
ProcessingPolicies & processingPolicies
int expectedRegionCallbacks
std::vector< InputRoute > inputs
std::vector< InputChannelSpec > inputChannels
Running state information of a given device.
uv_async_t * awakeMainThread
std::atomic< int64_t > cleanupCount
Forward channel information.
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
fair::mq::Channel & channel
std::string name
The name of the channel.
ForwardingPolicy const * policy
ForwardingCallback forward
InputChannelInfo * channelInfo
fair::mq::Socket * socket
DataProcessingDevice * device
enum TerminationPolicy termination
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
static Salt globalDeviceSalt()
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
static bool timesliceIsTimer(size_t timeslice)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg