Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <atomic>
33#include "Framework/InputSpan.h"
34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
36#endif
37#include "Framework/Signpost.h"
50
51#include "DecongestionService.h"
53#include "DataRelayerHelpers.h"
54#include "Headers/DataHeader.h"
56
57#include <Framework/Tracing.h>
58
59#include <fairmq/Parts.h>
60#include <fairmq/Socket.h>
61#include <fairmq/ProgOptions.h>
62#if __has_include(<fairmq/shmem/Message.h>)
63#include <fairmq/shmem/Message.h>
64#endif
65#include <Configuration/ConfigurationInterface.h>
66#include <Configuration/ConfigurationFactory.h>
67#include <Monitoring/Monitoring.h>
68#include <TMessage.h>
69#include <TClonesArray.h>
70
71#include <fmt/ostream.h>
72#include <algorithm>
73#include <vector>
74#include <numeric>
75#include <memory>
76#include <uv.h>
77#include <execinfo.h>
78#include <sstream>
79#include <boost/property_tree/json_parser.hpp>
80
81// Formatter to avoid having to rewrite the ostream operator for the enum
82namespace fmt
83{
84template <>
87} // namespace fmt
88
89// A log to use for general device logging
91// A log to use for general device logging
93// Special log to keep track of the lifetime of the parts
95// Stream which keeps track of the calibration lifetime logic
97// Special log to track the async queue behavior
99// Special log to track the forwarding requests
101// Special log to track CCDB related requests
103// Special log to track task scheduling
105
106using namespace o2::framework;
107using ConfigurationInterface = o2::configuration::ConfigurationInterface;
109
110constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
111
112namespace o2::framework
113{
114
115template <>
119
123{
124 auto* state = (DeviceState*)handle->data;
125 state->loopReason |= DeviceState::TIMER_EXPIRED;
126}
127
128bool hasOnlyTimers(DeviceSpec const& spec)
129{
130 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
131}
132
134{
135 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
136}
137
139{
140 auto* ref = (ServiceRegistryRef*)handle->data;
141 auto& state = ref->get<DeviceState>();
142 state.loopReason |= DeviceState::TIMER_EXPIRED;
143 // Check if this is a source device
144 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
145 auto& spec = ref->get<DeviceSpec const>();
146 std::string messageOnExpire = hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
147 if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
148 O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
149 } else {
150 O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
151 }
152 state.transitionHandling = TransitionHandlingState::Expired;
153}
154
156{
157 auto& state = ref.get<DeviceState>();
158 auto& context = ref.get<DataProcessorContext>();
159 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
160 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
161 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
162 state.streaming = newState;
163 ref.get<ControlService>().notifyStreamingState(state.streaming);
164};
165
167{
168 auto* ref = (ServiceRegistryRef*)handle->data;
169 auto& state = ref->get<DeviceState>();
170 auto& spec = ref->get<DeviceSpec const>();
171 state.loopReason |= DeviceState::TIMER_EXPIRED;
172
173 // Check if this is a source device
174 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
175
176 if (hasOnlyGenerated(spec)) {
177 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
179 } else {
180 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
181 state.allowedProcessing = DeviceState::CalibrationOnly;
182 }
183}
184
186{
187 auto* state = (DeviceState*)s->data;
189}
190
191DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
192{
193 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
194 return devices[running.index];
195}
196
202
204 : mRunningDevice{running},
205 mConfigRegistry{nullptr},
206 mServiceRegistry{registry}
207{
208 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
209 if (key == "cleanup") {
211 auto& deviceState = ref.get<DeviceState>();
212 int64_t cleanupCount = deviceState.cleanupCount.load();
213 int64_t newCleanupCount = std::stoll(value);
214 if (newCleanupCount <= cleanupCount) {
215 return;
216 }
217 deviceState.cleanupCount.store(newCleanupCount);
218 for (auto& info : deviceState.inputChannelInfos) {
219 fair::mq::Parts parts;
220 while (info.channel->Receive(parts, 0)) {
221 LOGP(debug, "Dropping {} parts", parts.Size());
222 if (parts.Size() == 0) {
223 break;
224 }
225 }
226 }
227 }
228 });
229
230 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
232 auto& deviceState = ref.get<DeviceState>();
233 auto& control = ref.get<ControlService>();
234 auto& callbacks = ref.get<CallbackService>();
235 control.notifyDeviceState(fair::mq::GetStateName(state));
237
238 if (deviceState.nextFairMQState.empty() == false) {
239 auto state = deviceState.nextFairMQState.back();
240 (void)this->ChangeState(state);
241 deviceState.nextFairMQState.pop_back();
242 }
243 };
244
245 // 99 is to execute DPL callbacks last
246 this->SubscribeToStateChange("99-dpl", stateWatcher);
247
248 // One task for now.
249 mStreams.resize(1);
250 mHandles.resize(1);
251
252 ServiceRegistryRef ref{mServiceRegistry};
253
254 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
255 auto& state = ref.get<DeviceState>();
256 assert(state.loop);
257 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
258 mAwakeHandle->data = &state;
259 if (res < 0) {
260 LOG(error) << "Unable to initialise subscription";
261 }
262
264 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
265 int res = uv_async_send(wakeHandle);
266 if (res < 0) {
267 LOG(error) << "Unable to notify subscription";
268 }
269 LOG(debug) << "State transition requested";
270 });
271}
272
273// Callback to execute the processing. Notice how the data is
274// is a vector of DataProcessorContext so that we can index the correct
275// one with the thread id. For the moment we simply use the first one.
276void run_callback(uv_work_t* handle)
277{
278 auto* task = (TaskStreamInfo*)handle->data;
279 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
280 // We create a new signpost interval for this specific data processor. Same id, same data processor.
281 auto& dataProcessorContext = ref.get<DataProcessorContext>();
282 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
283 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
286 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
287}
288
289// Once the processing in a thread is done, this is executed on the main thread.
290void run_completion(uv_work_t* handle, int status)
291{
292 auto* task = (TaskStreamInfo*)handle->data;
293 // Notice that the completion, while running on the main thread, still
294 // has a salt which is associated to the actual stream which was doing the computation
295 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
296 auto& state = ref.get<DeviceState>();
297 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
298
299 using o2::monitoring::Metric;
300 using o2::monitoring::Monitoring;
301 using o2::monitoring::tags::Key;
302 using o2::monitoring::tags::Value;
303
304 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
305 auto& dpStats = ref.get<DataProcessingStats>();
306 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
307 stats.totalConsumedTimeslices += accumulatedConsumed.timeslices;
308
309 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
310 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
311 dpStats.processCommandQueue();
312 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
313 assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]);
314 };
315
316 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
317 auto& dpStats = ref.get<DataProcessingStats>();
318 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
319 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
320 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
321 dpStats.processCommandQueue();
322 };
323
324 for (auto& consumer : state.offerConsumers) {
325 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
326 }
327 state.offerConsumers.clear();
328 quotaEvaluator.handleExpired(reportExpiredOffer);
329 quotaEvaluator.dispose(task->id.index);
330 task->running = false;
331}
332
333// Context for polling
335 enum struct PollerState : char { Stopped,
337 Connected,
338 Suspended };
339 char const* name = nullptr;
340 uv_loop_t* loop = nullptr;
342 DeviceState* state = nullptr;
343 fair::mq::Socket* socket = nullptr;
345 int fd = -1;
346 bool read = true;
348};
349
350void on_socket_polled(uv_poll_t* poller, int status, int events)
351{
352 auto* context = (PollerContext*)poller->data;
353 assert(context);
354 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
355 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
356 switch (events) {
357 case UV_READABLE: {
358 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
359 context->state->loopReason |= DeviceState::DATA_INCOMING;
360 } break;
361 case UV_WRITABLE: {
362 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
363 if (context->read) {
364 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
365 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
366 context->state->loopReason |= DeviceState::DATA_CONNECTED;
367 } else {
368 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
369 context->state->loopReason |= DeviceState::DATA_OUTGOING;
370 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
371 // just wait for the disconnect.
372 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
373 }
374 context->pollerState = PollerContext::PollerState::Connected;
375 } break;
376 case UV_DISCONNECT: {
377 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
378 } break;
379 case UV_PRIORITIZED: {
380 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
381 } break;
382 }
383 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
384}
385
386void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
387{
388 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
389 auto* context = (PollerContext*)poller->data;
390 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
391 if (status < 0) {
392 LOGP(fatal, "Error while polling {}: {}", context->name, status);
393 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
394 }
395 switch (events) {
396 case UV_READABLE: {
397 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
398 context->state->loopReason |= DeviceState::DATA_INCOMING;
399 assert(context->channelInfo);
400 context->channelInfo->readPolled = true;
401 } break;
402 case UV_WRITABLE: {
403 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
404 if (context->read) {
405 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
406 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
407 } else {
408 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
409 context->state->loopReason |= DeviceState::DATA_OUTGOING;
410 }
411 } break;
412 case UV_DISCONNECT: {
413 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
414 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
415 } break;
416 case UV_PRIORITIZED: {
417 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
418 } break;
419 }
420 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
421}
422
431{
432 auto ref = ServiceRegistryRef{mServiceRegistry};
433 auto& context = ref.get<DataProcessorContext>();
434 auto& spec = getRunningDevice(mRunningDevice, ref);
435
436 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
437 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
438 context.statelessProcess = spec.algorithm.onProcess;
439 context.statefulProcess = nullptr;
440 context.error = spec.algorithm.onError;
441 context.initError = spec.algorithm.onInitError;
442
443 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
444 if (configStore == nullptr) {
445 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
446 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
447 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
448 configStore->preload();
449 configStore->activate();
450 }
451
452 using boost::property_tree::ptree;
453
455 for (auto& entry : configStore->store()) {
456 std::stringstream ss;
457 std::string str;
458 if (entry.second.empty() == false) {
459 boost::property_tree::json_parser::write_json(ss, entry.second, false);
460 str = ss.str();
461 str.pop_back(); // remove EoL
462 } else {
463 str = entry.second.get_value<std::string>();
464 }
465 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
466 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
467 }
468
469 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
470
471 // Setup the error handlers for init
472 if (context.initError) {
473 context.initErrorHandling = [&errorCallback = context.initError,
474 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
478 auto& context = ref.get<DataProcessorContext>();
479 auto& err = error_from_ref(e);
480 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
481 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
482 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
483 auto& stats = ref.get<DataProcessingStats>();
485 InitErrorContext errorContext{ref, e};
486 errorCallback(errorContext);
487 };
488 } else {
489 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
490 auto& err = error_from_ref(e);
494 auto& context = ref.get<DataProcessorContext>();
495 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
496 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
497 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
498 auto& stats = ref.get<DataProcessingStats>();
500 exit(1);
501 };
502 }
503
504 context.expirationHandlers.clear();
505 context.init = spec.algorithm.onInit;
506 if (context.init) {
507 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
508 InitContext initContext{*mConfigRegistry, mServiceRegistry};
509
510 if (noCatch) {
511 try {
512 context.statefulProcess = context.init(initContext);
514 if (context.initErrorHandling) {
515 (context.initErrorHandling)(e);
516 }
517 }
518 } else {
519 try {
520 context.statefulProcess = context.init(initContext);
521 } catch (std::exception& ex) {
525 auto e = runtime_error(ex.what());
526 (context.initErrorHandling)(e);
528 (context.initErrorHandling)(e);
529 }
530 }
531 }
532 auto& state = ref.get<DeviceState>();
533 state.inputChannelInfos.resize(spec.inputChannels.size());
537 int validChannelId = 0;
538 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
539 auto& name = spec.inputChannels[ci].name;
540 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
541 state.inputChannelInfos[ci].state = InputChannelState::Pull;
542 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
543 validChannelId++;
544 } else {
545 state.inputChannelInfos[ci].id = {validChannelId++};
546 }
547 }
548
549 // Invoke the callback policy for this device.
550 if (spec.callbacksPolicy.policy != nullptr) {
551 InitContext initContext{*mConfigRegistry, mServiceRegistry};
552 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
553 }
554
555 // Services which are stream should be initialised now
556 auto* options = GetConfig();
557 for (size_t si = 0; si < mStreams.size(); ++si) {
559 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
560 }
561 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
562}
563
564void on_signal_callback(uv_signal_t* handle, int signum)
565{
566 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
567 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
568
569 auto* registry = (ServiceRegistry*)handle->data;
570 if (!registry) {
571 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
572 return;
573 }
574 ServiceRegistryRef ref{*registry};
575 auto& state = ref.get<DeviceState>();
576 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
577 auto& stats = ref.get<DataProcessingStats>();
579 size_t ri = 0;
580 while (ri != quotaEvaluator.mOffers.size()) {
581 auto& offer = quotaEvaluator.mOffers[ri];
582 // We were already offered some sharedMemory, so we
583 // do not consider the offer.
584 // FIXME: in principle this should account for memory
585 // available and being offered, however we
586 // want to get out of the woods for now.
587 if (offer.valid && offer.sharedMemory != 0) {
588 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
589 return;
590 }
591 ri++;
592 }
593 // Find the first empty offer and have 1GB of shared memory there
594 for (auto& offer : quotaEvaluator.mOffers) {
595 if (offer.valid == false) {
596 offer.cpu = 0;
597 offer.memory = 0;
598 offer.sharedMemory = 1000000000;
599 offer.valid = true;
600 offer.user = -1;
601 break;
602 }
603 }
605 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
606}
607
608static auto toBeForwardedHeader = [](void* header) -> bool {
609 // If is now possible that the record is not complete when
610 // we forward it, because of a custom completion policy.
611 // this means that we need to skip the empty entries in the
612 // record for being forwarded.
613 if (header == nullptr) {
614 return false;
615 }
616 auto sih = o2::header::get<SourceInfoHeader*>(header);
617 if (sih) {
618 return false;
619 }
620
621 auto dih = o2::header::get<DomainInfoHeader*>(header);
622 if (dih) {
623 return false;
624 }
625
626 auto dh = o2::header::get<DataHeader*>(header);
627 if (!dh) {
628 return false;
629 }
630 auto dph = o2::header::get<DataProcessingHeader*>(header);
631 if (!dph) {
632 return false;
633 }
634 return true;
635};
636
637static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
638 FairMQDeviceProxy& proxy,
639 std::unique_ptr<fair::mq::Message>& header,
640 std::unique_ptr<fair::mq::Message>& payload,
641 size_t total,
642 bool consume) {
643 if (header.get() == nullptr) {
644 // Missing an header is not an error anymore.
645 // it simply means that we did not receive the
646 // given input, but we were asked to
647 // consume existing, so we skip it.
648 return false;
649 }
650 if (payload.get() == nullptr && consume == true) {
651 // If the payload is not there, it means we already
652 // processed it with ConsumeExisiting. Therefore we
653 // need to do something only if this is the last consume.
654 header.reset(nullptr);
655 return false;
656 }
657
658 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
659 if (fdph == nullptr) {
660 LOG(error) << "Data is missing DataProcessingHeader";
661 return false;
662 }
663 auto fdh = o2::header::get<DataHeader*>(header->GetData());
664 if (fdh == nullptr) {
665 LOG(error) << "Data is missing DataHeader";
666 return false;
667 }
668
669 // We need to find the forward route only for the first
670 // part of a split payload. All the others will use the same.
671 // but always check if we have a sequence of multiple payloads
672 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
673 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
674 }
675 return cachedForwardingChoices.empty() == false;
676};
677
678struct DecongestionContext {
681};
682
683auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
684 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
685 auto& ref = task.user<DecongestionContext>().ref;
686
687 auto& decongestion = ref.get<DecongestionService>();
688 auto& proxy = ref.get<FairMQDeviceProxy>();
689 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
690 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
691 return;
692 }
693 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
694 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
695 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
696 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
697 // TODO: this we could cache in the proxy at the bind moment.
698 if (info.channelType != ChannelAccountingType::DPL) {
699 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
700 info.name.c_str());
701
702 continue;
703 }
704 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
705 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
706 info.name.c_str(), oldestTimeslice.timeslice.value);
707 }
708 }
709};
710
711// This is how we do the forwarding, i.e. we push
712// the inputs which are shared between this device and others
713// to the next one in the daisy chain.
714// FIXME: do it in a smarter way than O(N^2)
715static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
716 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
717 auto& proxy = registry.get<FairMQDeviceProxy>();
718 // we collect all messages per forward in a map and send them together
719 std::vector<fair::mq::Parts> forwardedParts;
720 forwardedParts.resize(proxy.getNumForwards());
721 std::vector<ChannelIndex> cachedForwardingChoices{};
722 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
723 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
724 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
725
726 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
727 auto& messageSet = currentSetOfInputs[ii];
728 // In case the messageSet is empty, there is nothing to be done.
729 if (messageSet.size() == 0) {
730 continue;
731 }
732 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
733 continue;
734 }
735 cachedForwardingChoices.clear();
736
737 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
738 auto& messageSet = currentSetOfInputs[ii];
739 auto& header = messageSet.header(pi);
740 auto& payload = messageSet.payload(pi);
741 auto total = messageSet.getNumberOfPayloads(pi);
742
743 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
744 continue;
745 }
746
747 // In case of more than one forward route, we need to copy the message.
748 // This will eventually use the same mamory if running with the same backend.
749 if (cachedForwardingChoices.size() > 1) {
750 copy = true;
751 }
752 auto* dh = o2::header::get<DataHeader*>(header->GetData());
753 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
754
755 if (copy) {
756 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
757 auto&& newHeader = header->GetTransport()->CreateMessage();
758 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
759 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
760 newHeader->Copy(*header);
761 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
762
763 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
764 auto&& newPayload = header->GetTransport()->CreateMessage();
765 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
766 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
767 }
768 }
769 } else {
770 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
771 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
772 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
773 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
774 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
775 }
776 }
777 }
778 }
779 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
780 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
781 if (forwardedParts[fi].Size() == 0) {
782 continue;
783 }
784 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
785 auto& parts = forwardedParts[fi];
786 if (info.policy == nullptr) {
787 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
788 continue;
789 }
790 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
791 info.policy->forward(parts, ChannelIndex{fi}, registry);
792 }
793
794 auto& asyncQueue = registry.get<AsyncQueue>();
795 auto& decongestion = registry.get<DecongestionService>();
796 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
797 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
798 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
799 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
800 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
801};
802
803extern volatile int region_read_global_dummy_variable;
805
807void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
808{
809 if (infos.empty() == false) {
810 std::vector<fair::mq::RegionInfo> toBeNotified;
811 toBeNotified.swap(infos); // avoid any MT issue.
812 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
813 for (auto const& info : toBeNotified) {
814 if (dummyRead) {
815 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
816 region_read_global_dummy_variable = ((int*)info.ptr)[i];
817 }
818 }
819 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
820 }
821 }
822}
823
824namespace
825{
827{
828 auto* state = (DeviceState*)handle->data;
830}
831} // namespace
832
833void DataProcessingDevice::initPollers()
834{
835 auto ref = ServiceRegistryRef{mServiceRegistry};
836 auto& deviceContext = ref.get<DeviceContext>();
837 auto& context = ref.get<DataProcessorContext>();
838 auto& spec = ref.get<DeviceSpec const>();
839 auto& state = ref.get<DeviceState>();
840 // We add a timer only in case a channel poller is not there.
841 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
842 for (auto& [channelName, channel] : GetChannels()) {
843 InputChannelInfo* channelInfo;
844 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
845 auto& channelSpec = spec.inputChannels[ci];
846 channelInfo = &state.inputChannelInfos[ci];
847 if (channelSpec.name != channelName) {
848 continue;
849 }
850 channelInfo->channel = &this->GetChannel(channelName, 0);
851 break;
852 }
853 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
854 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
855 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
856 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
857 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
858 continue;
859 }
860 // We only watch receiving sockets.
861 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
862 LOGP(detail, "{} is to send data. Not polling.", channelName);
863 continue;
864 }
865
866 if (channelName.rfind("from_", 0) != 0) {
867 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
868 continue;
869 }
870
871 // We assume there is always a ZeroMQ socket behind.
872 int zmq_fd = 0;
873 size_t zmq_fd_len = sizeof(zmq_fd);
874 // FIXME: I should probably save those somewhere... ;-)
875 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
876 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
877 if (zmq_fd == 0) {
878 LOG(error) << "Cannot get file descriptor for channel." << channelName;
879 continue;
880 }
881 LOGP(detail, "Polling socket for {}", channelName);
882 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
883 pCtx->name = strdup(channelName.c_str());
884 pCtx->loop = state.loop;
885 pCtx->device = this;
886 pCtx->state = &state;
887 pCtx->fd = zmq_fd;
888 assert(channelInfo != nullptr);
889 pCtx->channelInfo = channelInfo;
890 pCtx->socket = &channel[0].GetSocket();
891 pCtx->read = true;
892 poller->data = pCtx;
893 uv_poll_init(state.loop, poller, zmq_fd);
894 if (channelName.rfind("from_", 0) != 0) {
895 LOGP(detail, "{} is an out of band channel.", channelName);
896 state.activeOutOfBandPollers.push_back(poller);
897 } else {
898 channelInfo->pollerIndex = state.activeInputPollers.size();
899 state.activeInputPollers.push_back(poller);
900 }
901 }
902 // In case we do not have any input channel and we do not have
903 // any timers or signal watchers we still wake up whenever we can send data to downstream
904 // devices to allow for enumerations.
905 if (state.activeInputPollers.empty() &&
906 state.activeOutOfBandPollers.empty() &&
907 state.activeTimers.empty() &&
908 state.activeSignals.empty()) {
909 // FIXME: this is to make sure we do not reset the output timer
910 // for readout proxies or similar. In principle this should go once
911 // we move to OutOfBand InputSpec.
912 if (state.inputChannelInfos.empty()) {
913 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
914 deviceContext.exitTransitionTimeout = 0;
915 }
916 for (auto& [channelName, channel] : GetChannels()) {
917 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
918 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
919 continue;
920 }
921 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
922 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
923 continue;
924 }
925 // We assume there is always a ZeroMQ socket behind.
926 int zmq_fd = 0;
927 size_t zmq_fd_len = sizeof(zmq_fd);
928 // FIXME: I should probably save those somewhere... ;-)
929 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
930 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
931 if (zmq_fd == 0) {
932 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
933 continue;
934 }
935 LOG(detail) << "Polling socket for " << channel[0].GetName();
936 // FIXME: leak
937 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
938 pCtx->name = strdup(channelName.c_str());
939 pCtx->loop = state.loop;
940 pCtx->device = this;
941 pCtx->state = &state;
942 pCtx->fd = zmq_fd;
943 pCtx->read = false;
944 poller->data = pCtx;
945 uv_poll_init(state.loop, poller, zmq_fd);
946 state.activeOutputPollers.push_back(poller);
947 }
948 }
949 } else {
950 LOGP(detail, "This is a fake device so we exit after the first iteration.");
951 deviceContext.exitTransitionTimeout = 0;
952 // This is a fake device, so we can request to exit immediately
953 ServiceRegistryRef ref{mServiceRegistry};
954 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
955 // A two second timer to stop internal devices which do not want to
956 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
957 uv_timer_init(state.loop, timer);
958 timer->data = &state;
959 uv_update_time(state.loop);
960 uv_timer_start(timer, on_idle_timer, 2000, 2000);
961 state.activeTimers.push_back(timer);
962 }
963}
964
965void DataProcessingDevice::startPollers()
966{
967 auto ref = ServiceRegistryRef{mServiceRegistry};
968 auto& deviceContext = ref.get<DeviceContext>();
969 auto& state = ref.get<DeviceState>();
970
971 for (auto* poller : state.activeInputPollers) {
972 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
973 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
974 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
975 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
976 }
977 for (auto& poller : state.activeOutOfBandPollers) {
978 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
979 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
980 }
981 for (auto* poller : state.activeOutputPollers) {
982 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
983 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
984 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
985 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
986 }
987
988 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
989 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
990 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
991
992 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
993 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
994 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
995}
996
997void DataProcessingDevice::stopPollers()
998{
999 auto ref = ServiceRegistryRef{mServiceRegistry};
1000 auto& deviceContext = ref.get<DeviceContext>();
1001 auto& state = ref.get<DeviceState>();
1002 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
1003 for (auto* poller : state.activeInputPollers) {
1004 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
1005 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
1006 uv_poll_stop(poller);
1007 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1008 }
1009 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
1010 for (auto* poller : state.activeOutOfBandPollers) {
1011 uv_poll_stop(poller);
1012 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1013 }
1014 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
1015 for (auto* poller : state.activeOutputPollers) {
1016 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
1017 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
1018 uv_poll_stop(poller);
1019 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1020 }
1021
1022 uv_timer_stop(deviceContext.gracePeriodTimer);
1023 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
1024 free(deviceContext.gracePeriodTimer);
1025 deviceContext.gracePeriodTimer = nullptr;
1026
1027 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1028 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1029 free(deviceContext.dataProcessingGracePeriodTimer);
1030 deviceContext.dataProcessingGracePeriodTimer = nullptr;
1031}
1032
1034{
1035 auto ref = ServiceRegistryRef{mServiceRegistry};
1036 auto& deviceContext = ref.get<DeviceContext>();
1037 auto& context = ref.get<DataProcessorContext>();
1038
1039 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1040 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
1041 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
1042 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
1043 auto& state = ref.get<DeviceState>();
1044 int i = 0;
1045 for (auto& di : distinct) {
1046 auto& route = spec.inputs[di];
1047 if (route.configurator.has_value() == false) {
1048 i++;
1049 continue;
1050 }
1051 ExpirationHandler handler{
1052 .name = route.configurator->name,
1053 .routeIndex = RouteIndex{i++},
1054 .lifetime = route.matcher.lifetime,
1055 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1056 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1057 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1058 context.expirationHandlers.emplace_back(std::move(handler));
1059 }
1060
1061 if (state.awakeMainThread == nullptr) {
1062 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1063 state.awakeMainThread->data = &state;
1064 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1065 }
1066
1067 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1068 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1069 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1070
1071 for (auto& channel : GetChannels()) {
1072 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1073 &registry = mServiceRegistry,
1074 &pendingRegionInfos = mPendingRegionInfos,
1075 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1076 std::lock_guard<std::mutex> lock(regionInfoMutex);
1077 LOG(detail) << ">>> Region info event" << info.event;
1078 LOG(detail) << "id: " << info.id;
1079 LOG(detail) << "ptr: " << info.ptr;
1080 LOG(detail) << "size: " << info.size;
1081 LOG(detail) << "flags: " << info.flags;
1082 // Now we check for pending events with the mutex,
1083 // so the lines below are atomic.
1084 pendingRegionInfos.push_back(info);
1085 context.expectedRegionCallbacks -= 1;
1086 // We always want to handle these on the main loop,
1087 // so we awake it.
1088 ServiceRegistryRef ref{registry};
1089 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1090 });
1091 }
1092
1093 // Add a signal manager for SIGUSR1 so that we can force
1094 // an event from the outside, making sure that the event loop can
1095 // be unblocked (e.g. by a quitting DPL driver) even when there
1096 // is no data pending to be processed.
1097 if (deviceContext.sigusr1Handle == nullptr) {
1098 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1099 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1100 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1101 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1102 }
1103 // If there is any signal, we want to make sure they are active
1104 for (auto& handle : state.activeSignals) {
1105 handle->data = &state;
1106 }
1107 // When we start, we must make sure that we do listen to the signal
1108 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1109
1111 DataProcessingDevice::initPollers();
1112
1113 // Whenever we InitTask, we consider as if the previous iteration
1114 // was successful, so that even if there is no timer or receiving
1115 // channel, we can still start an enumeration.
1116 DataProcessorContext* initialContext = nullptr;
1117 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1118 if (!idle) {
1119 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1120 }
1121
1122 // We should be ready to run here. Therefore we copy all the
1123 // required parts in the DataProcessorContext. Eventually we should
1124 // do so on a per thread basis, with fine grained locks.
1125 // FIXME: this should not use ServiceRegistry::threadSalt, but
1126 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1127 // N is the number of the multiplexed data processor.
1128 // We will get there.
1129 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1130
1131 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1132
1133 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1134 std::lock_guard<std::mutex> lock(mutex);
1135 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1136 };
1137 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1142 while (hasPendingEvents(deviceContext)) {
1143 // Wait for the callback to signal its done, so that we do not busy wait.
1144 uv_run(state.loop, UV_RUN_ONCE);
1145 // Handle callbacks if any
1146 {
1147 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1148 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1149 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1150 }
1151 }
1152 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1153}
1154
1156{
1157 context.isSink = false;
1158 // If nothing is a sink, the rate limiting simply does not trigger.
1159 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1160
1161 auto ref = ServiceRegistryRef{mServiceRegistry};
1162 auto& spec = ref.get<DeviceSpec const>();
1163
1164 // The policy is now allowed to state the default.
1165 context.balancingInputs = spec.completionPolicy.balanceChannels;
1166 // This is needed because the internal injected dummy sink should not
1167 // try to balance inputs unless the rate limiting is requested.
1168 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1169 context.balancingInputs = false;
1170 }
1171 if (enableRateLimiting) {
1172 for (auto& spec : spec.outputs) {
1173 if (spec.matcher.binding.value == "dpl-summary") {
1174 context.isSink = true;
1175 break;
1176 }
1177 }
1178 }
1179
1180 context.registry = &mServiceRegistry;
1183 if (context.error != nullptr) {
1184 context.errorHandling = [&errorCallback = context.error,
1185 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1189 auto& err = error_from_ref(e);
1190 auto& context = ref.get<DataProcessorContext>();
1191 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1192 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1193 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1194 auto& stats = ref.get<DataProcessingStats>();
1196 ErrorContext errorContext{record, ref, e};
1197 errorCallback(errorContext);
1198 };
1199 } else {
1200 context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1201 auto& err = error_from_ref(e);
1205 auto& context = ref.get<DataProcessorContext>();
1206 auto& deviceContext = ref.get<DeviceContext>();
1207 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1208 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1209 auto& stats = ref.get<DataProcessingStats>();
1211 switch (deviceContext.processingPolicies.error) {
1213 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1214 throw e;
1215 default:
1216 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1217 break;
1218 }
1219 };
1220 }
1221
1222 auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
1225 bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1226 bool onlyConditions = true;
1227 bool overriddenEarlyForward = false;
1228 for (auto& forwarded : spec.forwards) {
1229 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1230 onlyConditions = false;
1231 }
1232#if !__has_include(<fairmq/shmem/Message.h>)
1233 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1234 context.canForwardEarly = false;
1235 overriddenEarlyForward = true;
1236 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1237 break;
1238 }
1239#endif
1241 context.canForwardEarly = false;
1242 overriddenEarlyForward = true;
1243 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1244 break;
1245 }
1246 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1247 context.canForwardEarly = false;
1248 overriddenEarlyForward = true;
1249 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1250 break;
1251 }
1252 }
1253 if (!overriddenEarlyForward && onlyConditions) {
1254 context.canForwardEarly = true;
1255 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1256 }
1257 return canForwardEarly;
1258 };
1259 context.canForwardEarly = decideEarlyForward();
1260}
1261
1263{
1264 auto ref = ServiceRegistryRef{mServiceRegistry};
1265 auto& state = ref.get<DeviceState>();
1266
1267 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1268 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1269 state.quitRequested = false;
1271 state.allowedProcessing = DeviceState::Any;
1272 for (auto& info : state.inputChannelInfos) {
1273 if (info.state != InputChannelState::Pull) {
1274 info.state = InputChannelState::Running;
1275 }
1276 }
1277
1278 // Catch callbacks which fail before we start.
1279 // Notice that when running multiple dataprocessors
1280 // we should probably allow expendable ones to fail.
1281 try {
1282 auto& dpContext = ref.get<DataProcessorContext>();
1283 dpContext.preStartCallbacks(ref);
1284 for (size_t i = 0; i < mStreams.size(); ++i) {
1285 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1286 auto& context = streamRef.get<StreamContext>();
1287 context.preStartStreamCallbacks(streamRef);
1288 }
1289 } catch (std::exception& e) {
1290 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1291 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1292 throw;
1293 } catch (o2::framework::RuntimeErrorRef& e) {
1294 auto& err = error_from_ref(e);
1295 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1296 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1297 throw;
1298 } catch (...) {
1299 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1300 throw;
1301 }
1302
1303 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1304 startPollers();
1305
1306 // Raise to 1 when we are ready to start processing
1307 using o2::monitoring::Metric;
1308 using o2::monitoring::Monitoring;
1309 using o2::monitoring::tags::Key;
1310 using o2::monitoring::tags::Value;
1311
1312 auto& monitoring = ref.get<Monitoring>();
1313 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1314 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1315}
1316
1318{
1319 ServiceRegistryRef ref{mServiceRegistry};
1320 // Raise to 1 when we are ready to start processing
1321 using o2::monitoring::Metric;
1322 using o2::monitoring::Monitoring;
1323 using o2::monitoring::tags::Key;
1324 using o2::monitoring::tags::Value;
1325
1326 auto& monitoring = ref.get<Monitoring>();
1327 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1328
1329 stopPollers();
1330 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1331 auto& dpContext = ref.get<DataProcessorContext>();
1332 dpContext.postStopCallbacks(ref);
1333}
1334
1336{
1337 ServiceRegistryRef ref{mServiceRegistry};
1338 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1339}
1340
1342{
1343 auto& state = ref.get<DeviceState>();
1344 auto& deviceProxy = ref.get<FairMQDeviceProxy>();
1345 if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
1346 return state.transitionHandling;
1347 }
1348 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1349 auto& deviceContext = ref.get<DeviceContext>();
1350 // Check if we only have timers
1351 auto& spec = ref.get<DeviceSpec const>();
1352 if (hasOnlyTimers(spec)) {
1354 }
1355
1356 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1357 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1358 uv_update_time(state.loop);
1359 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1360 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1361 }
1362 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1363 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1364 uv_update_time(state.loop);
1365 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1366 deviceContext.exitTransitionTimeout);
1367 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1368 bool onlyGenerated = hasOnlyGenerated(spec);
1369 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1371 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1372 } else {
1373 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1374 "New state requested. Waiting for %d seconds before %{public}s",
1375 timeout,
1376 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1377 }
1379 } else {
1380 if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
1381 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1382 } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
1383 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1384 } else if (policies.termination == TerminationPolicy::QUIT) {
1385 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1386 } else {
1387 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1388 }
1390 }
1391}
1392
1394{
1395 ServiceRegistryRef ref{mServiceRegistry};
1396 auto& state = ref.get<DeviceState>();
1398 bool firstLoop = true;
1399 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1400 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1401
1402 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1403 if (dplEnableMultithreding) {
1404 setenv("UV_THREADPOOL_SIZE", "1", 1);
1405 }
1406
1407 while (state.transitionHandling != TransitionHandlingState::Expired) {
1408 if (state.nextFairMQState.empty() == false) {
1409 (void)this->ChangeState(state.nextFairMQState.back());
1410 state.nextFairMQState.pop_back();
1411 }
1412 // Notify on the main thread the new region callbacks, making sure
1413 // no callback is issued if there is something still processing.
1414 {
1415 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1416 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1417 }
1418 // This will block for the correct delay (or until we get data
1419 // on a socket). We also do not block on the first iteration
1420 // so that devices which do not have a timer can still start an
1421 // enumeration.
1422 {
1423 ServiceRegistryRef ref{mServiceRegistry};
1424 ref.get<DriverClient>().flushPending(mServiceRegistry);
1425 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1426 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1427 // In such case we will take care of it at next iteration.
1428 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1429
1430 auto shouldNotWait = (lastActive != nullptr &&
1431 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1433 if (firstLoop) {
1434 shouldNotWait = true;
1435 firstLoop = false;
1436 }
1437 if (lastActive != nullptr) {
1439 }
1440 if (NewStatePending()) {
1441 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1442 shouldNotWait = true;
1444 }
1445 state.transitionHandling = updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
1446 // If we are Idle, we can then consider the transition to be expired.
1447 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1448 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1449 state.transitionHandling = TransitionHandlingState::Expired;
1450 }
1451 if (state.severityStack.empty() == false) {
1452 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1453 state.severityStack.pop_back();
1454 }
1455 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1456 // shouldNotWait |= info.readPolled;
1457 // }
1458 state.loopReason = DeviceState::NO_REASON;
1459 state.firedTimers.clear();
1460 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1461 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1462 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1463 }
1464 // Run the asynchronous queue just before sleeping again, so that:
1465 // - we can trigger further events from the queue
1466 // - we can guarantee this is the last thing we do in the loop (
1467 // assuming no one else is adding to the queue before this point).
1468 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1469 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1470 ServiceRegistryRef ref{registry};
1471 ref.get<AsyncQueue>();
1472 ref.get<DecongestionService>();
1473 ref.get<DataRelayer>();
1474 // Get the current timeslice for the slot.
1475 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1477 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1478 };
1479 auto& relayer = ref.get<DataRelayer>();
1480 relayer.prunePending(onDrop);
1481 auto& queue = ref.get<AsyncQueue>();
1482 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1483 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1484 if (shouldNotWait == false) {
1485 auto& dpContext = ref.get<DataProcessorContext>();
1486 dpContext.preLoopCallbacks(ref);
1487 }
1488 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1489 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1490 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1491 if ((state.loopReason & state.tracingFlags) != 0) {
1492 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1493 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1494 } else if (state.severityStack.empty() == false) {
1495 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1496 state.severityStack.pop_back();
1497 }
1498 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1499
1500 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1501 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1502 relayer.rescan();
1503 }
1504
1505 if (!state.pendingOffers.empty()) {
1506 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1507 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1508 }
1509 }
1510
1511 // Notify on the main thread the new region callbacks, making sure
1512 // no callback is issued if there is something still processing.
1513 // Notice that we still need to perform callbacks also after
1514 // the socket epolled, because otherwise we would end up serving
1515 // the callback after the first data arrives is the system is too
1516 // fast to transition from Init to Run.
1517 {
1518 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1519 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1520 }
1521
1522 assert(mStreams.size() == mHandles.size());
1524 TaskStreamRef streamRef{-1};
1525 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1526 auto& taskInfo = mStreams[ti];
1527 if (taskInfo.running) {
1528 continue;
1529 }
1530 // Stream 0 is for when we run in
1531 streamRef.index = ti;
1532 }
1533 using o2::monitoring::Metric;
1534 using o2::monitoring::Monitoring;
1535 using o2::monitoring::tags::Key;
1536 using o2::monitoring::tags::Value;
1537 // We have an empty stream, let's check if we have enough
1538 // resources for it to run something
1539 if (streamRef.index != -1) {
1540 // Synchronous execution of the callbacks. This will be moved in the
1541 // moved in the on_socket_polled once we have threading in place.
1542 uv_work_t& handle = mHandles[streamRef.index];
1543 TaskStreamInfo& stream = mStreams[streamRef.index];
1544 handle.data = &mStreams[streamRef.index];
1545
1546 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1547 ServiceRegistryRef ref{registry};
1548 auto& dpStats = ref.get<DataProcessingStats>();
1549 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1550 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1551 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
1552 dpStats.processCommandQueue();
1553 };
1554 auto ref = ServiceRegistryRef{mServiceRegistry};
1555
1556 // Deciding wether to run or not can be done by passing a request to
1557 // the evaluator. In this case, the request is always satisfied and
1558 // we run on whatever resource is available.
1559 auto& spec = ref.get<DeviceSpec const>();
1560 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1561
1562 struct SchedulingStats {
1563 std::atomic<size_t> lastScheduled = 0;
1564 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1565 std::atomic<size_t> numberOfUnscheduled = 0;
1566 std::atomic<size_t> numberOfScheduled = 0;
1567 };
1568 static SchedulingStats schedulingStats;
1569 O2_SIGNPOST_ID_GENERATE(sid, scheduling);
1570 if (enough) {
1571 stream.id = streamRef;
1572 stream.running = true;
1573 stream.registry = &mServiceRegistry;
1574 schedulingStats.lastScheduled = uv_now(state.loop);
1575 schedulingStats.numberOfScheduled++;
1576 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1577 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
1578 if (dplEnableMultithreding) [[unlikely]] {
1579 stream.task = &handle;
1580 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1581 } else {
1582 run_callback(&handle);
1583 run_completion(&handle, 0);
1584 }
1585 } else {
1586 if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1587 (uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) {
1588 O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1589 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1590 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1591 schedulingStats.lastScheduled.load());
1592 } else {
1593 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1594 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1595 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1596 schedulingStats.lastScheduled.load());
1597 }
1598 schedulingStats.numberOfUnscheduled++;
1599 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1600 auto ref = ServiceRegistryRef{mServiceRegistry};
1601 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1602 }
1603 }
1604 }
1605
1606 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
1607 auto& spec = ref.get<DeviceSpec const>();
1609 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1610 auto& info = state.inputChannelInfos[ci];
1611 info.parts.fParts.clear();
1612 }
1613 state.transitionHandling = TransitionHandlingState::NoTransition;
1614}
1615
1619{
1620 auto& context = ref.get<DataProcessorContext>();
1621 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1622 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1623
1624 {
1625 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1626 }
1627 // Whether or not we had something to do.
1628
1629 // Initialise the value for context.allDone. It will possibly be updated
1630 // below if any of the channels is not done.
1631 //
1632 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1633 // expect to receive an EndOfStream signal. Thus we do not wait for these
1634 // to be completed. In the case of data source devices, as they do not have
1635 // real data input channels, they have to signal EndOfStream themselves.
1636 auto& state = ref.get<DeviceState>();
1637 auto& spec = ref.get<DeviceSpec const>();
1638 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1639 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1640 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1641 if (info.channel) {
1642 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1643 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1644 } else {
1645 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1646 }
1647 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1648 });
1649 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1650 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1653 static std::vector<int> pollOrder;
1654 pollOrder.resize(state.inputChannelInfos.size());
1655 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1656 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1657 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1658 });
1659
1660 // Nothing to poll...
1661 if (pollOrder.empty()) {
1662 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1663 return;
1664 }
1665 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1666 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1667 auto delta = currentNewest.value - currentOldest.value;
1668 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1669 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1670 auto& infos = state.inputChannelInfos;
1671
1672 if (context.balancingInputs) {
1673 static int pipelineLength = DefaultsHelpers::pipelineLength();
1674 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1675 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1676 return infos[a].oldestForChannel.value > limitNew;
1677 });
1678 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1679 const auto& channelInfo = state.inputChannelInfos[*it];
1680 if (channelInfo.pollerIndex != -1) {
1681 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1682 auto& pollerContext = *(PollerContext*)(poller->data);
1683 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1684 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1685 bool shouldBeRunning = it < newEnd;
1686 if (running != shouldBeRunning) {
1687 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1688 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1689 }
1690 }
1691 }
1692 }
1693 pollOrder.erase(newEnd, pollOrder.end());
1694 }
1695 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1696
1697 for (auto sci : pollOrder) {
1698 auto& info = state.inputChannelInfos[sci];
1699 auto& channelSpec = spec.inputChannels[sci];
1700 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1701 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1702
1703 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1704 context.allDone = false;
1705 }
1706 if (info.state != InputChannelState::Running) {
1707 // Remember to flush data if we are not running
1708 // and there is some message pending.
1709 if (info.parts.Size()) {
1711 }
1712 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1713 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1714 continue;
1715 }
1716 if (info.channel == nullptr) {
1717 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1718 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1719 continue;
1720 }
1721 // Only poll DPL channels for now.
1723 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1724 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1725 continue;
1726 }
1727 auto& socket = info.channel->GetSocket();
1728 // If we have pending events from a previous iteration,
1729 // we do receive in any case.
1730 // Otherwise we check if there is any pending event and skip
1731 // this channel in case there is none.
1732 if (info.hasPendingEvents == 0) {
1733 socket.Events(&info.hasPendingEvents);
1734 // If we do not read, we can continue.
1735 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1736 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1737 continue;
1738 }
1739 }
1740 // We can reset this, because it means we have seen at least 1
1741 // message after the UV_READABLE was raised.
1742 info.readPolled = false;
1743 // Notice that there seems to be a difference between the documentation
1744 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1745 // is raised does not mean that a message is immediately available to
1746 // read, just that it will be available soon, so the receive can
1747 // still return -2. To avoid this we keep receiving on the socket until
1748 // we get a message. In order not to overflow the DPL queue we process
1749 // one message at the time and we keep track of wether there were more
1750 // to process.
1751 bool newMessages = false;
1752 while (true) {
1753 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1754 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1755 if (info.parts.Size() < 64) {
1756 fair::mq::Parts parts;
1757 info.channel->Receive(parts, 0);
1758 if (parts.Size()) {
1759 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1760 }
1761 for (auto&& part : parts) {
1762 info.parts.fParts.emplace_back(std::move(part));
1763 }
1764 newMessages |= true;
1765 }
1766
1767 if (info.parts.Size() >= 0) {
1769 // Receiving data counts as activity now, so that
1770 // We can make sure we process all the pending
1771 // messages without hanging on the uv_run.
1772 break;
1773 }
1774 }
1775 // We check once again for pending events, keeping track if this was the
1776 // case so that we can immediately repeat this loop and avoid remaining
1777 // stuck in uv_run. This is because we will not get notified on the socket
1778 // if more events are pending due to zeromq level triggered approach.
1779 socket.Events(&info.hasPendingEvents);
1780 if (info.hasPendingEvents) {
1781 info.readPolled = false;
1782 // In case there were messages, we consider it as activity
1783 if (newMessages) {
1784 state.lastActiveDataProcessor.store(&context);
1785 }
1786 }
1787 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1788 channelSpec.name.c_str(), info.id.value);
1789 }
1790}
1791
1793{
1794 auto& context = ref.get<DataProcessorContext>();
1795 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1796 auto& state = ref.get<DeviceState>();
1797 auto& spec = ref.get<DeviceSpec const>();
1798
1799 if (state.streaming == StreamingState::Idle) {
1800 return;
1801 }
1802
1803 context.completed.clear();
1804 context.completed.reserve(16);
1805 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1806 state.lastActiveDataProcessor.store(&context);
1807 }
1808 DanglingContext danglingContext{*context.registry};
1809
1810 context.preDanglingCallbacks(danglingContext);
1811 if (state.lastActiveDataProcessor.load() == nullptr) {
1812 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1813 }
1814 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1815 if (activity.expiredSlots > 0) {
1816 state.lastActiveDataProcessor = &context;
1817 }
1818
1819 context.completed.clear();
1820 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1821 state.lastActiveDataProcessor = &context;
1822 }
1823
1824 context.postDanglingCallbacks(danglingContext);
1825
1826 // If we got notified that all the sources are done, we call the EndOfStream
1827 // callback and return false. Notice that what happens next is actually
1828 // dependent on the callback, not something which is controlled by the
1829 // framework itself.
1830 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1832 state.lastActiveDataProcessor = &context;
1833 }
1834
1835 if (state.streaming == StreamingState::EndOfStreaming) {
1836 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1837 // We keep processing data until we are Idle.
1838 // FIXME: not sure this is the correct way to drain the queues, but
1839 // I guess we will see.
1842 auto& relayer = ref.get<DataRelayer>();
1843
1844 bool shouldProcess = hasOnlyGenerated(spec) == false;
1845
1846 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1847 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1848 }
1849
1850 auto& timingInfo = ref.get<TimingInfo>();
1851 // We should keep the data generated at end of stream only for those
1852 // which are not sources.
1853 timingInfo.keepAtEndOfStream = shouldProcess;
1854 // Fill timinginfo with some reasonable values for data sent with endOfStream
1855 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1856 timingInfo.tfCounter = -1;
1857 timingInfo.firstTForbit = -1;
1858 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1859 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1860 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1861
1862 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1863
1864 context.preEOSCallbacks(eosContext);
1865 auto& streamContext = ref.get<StreamContext>();
1866 streamContext.preEOSCallbacks(eosContext);
1867 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1868 streamContext.postEOSCallbacks(eosContext);
1869 context.postEOSCallbacks(eosContext);
1870
1871 for (auto& channel : spec.outputChannels) {
1872 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1874 }
1875 // This is needed because the transport is deleted before the device.
1876 relayer.clear();
1878 // In case we should process, note the data processor responsible for it
1879 if (shouldProcess) {
1880 state.lastActiveDataProcessor = &context;
1881 }
1882 // On end of stream we shut down all output pollers.
1883 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1884 for (auto& poller : state.activeOutputPollers) {
1885 uv_poll_stop(poller);
1886 }
1887 return;
1888 }
1889
1890 if (state.streaming == StreamingState::Idle) {
1891 // On end of stream we shut down all output pollers.
1892 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1893 for (auto& poller : state.activeOutputPollers) {
1894 uv_poll_stop(poller);
1895 }
1896 }
1897
1898 return;
1899}
1900
1902{
1903 ServiceRegistryRef ref{mServiceRegistry};
1904 ref.get<DataRelayer>().clear();
1905 auto& deviceContext = ref.get<DeviceContext>();
1906 // If the signal handler is there, we should
1907 // hide the registry from it, so that we do not
1908 // end up calling the signal handler on something
1909 // which might not be there anymore.
1910 if (deviceContext.sigusr1Handle) {
1911 deviceContext.sigusr1Handle->data = nullptr;
1912 }
1913 // Makes sure we do not have a working context on
1914 // shutdown.
1915 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1916 handle->data = nullptr;
1917 }
1918}
1919
1922 {
1923 }
1924};
1925
1931{
1932 using InputInfo = DataRelayer::InputInfo;
1934
1935 auto& context = ref.get<DataProcessorContext>();
1936 // This is the same id as the upper level function, so we get the events
1937 // associated with the same interval. We will simply use "handle_data" as
1938 // the category.
1939 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1940
1941 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1942 // and we do a few stats. We bind parts as a lambda captured variable, rather
1943 // than an input, because we do not want the outer loop actually be exposed
1944 // to the implementation details of the messaging layer.
1945 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1946 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1947 auto ref = ServiceRegistryRef{*context.registry};
1948 auto& stats = ref.get<DataProcessingStats>();
1949 auto& state = ref.get<DeviceState>();
1950 auto& parts = info.parts;
1951 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1952
1953 std::vector<InputInfo> results;
1954 // we can reserve the upper limit
1955 results.reserve(parts.Size() / 2);
1956 size_t nTotalPayloads = 0;
1957
1958 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1959 results.emplace_back(position, length, type, index);
1960 if (type != InputType::Invalid && length > 1) {
1961 nTotalPayloads += length - 1;
1962 }
1963 };
1964
1965 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1966 auto* headerData = parts.At(pi)->GetData();
1967 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1968 if (sih) {
1969 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1970 info.state = sih->state;
1971 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1972 state.lastActiveDataProcessor = &context;
1973 continue;
1974 }
1975 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1976 if (dih) {
1977 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1978 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1979 state.lastActiveDataProcessor = &context;
1980 continue;
1981 }
1982 auto dh = o2::header::get<DataHeader*>(headerData);
1983 if (!dh) {
1984 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1985 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1986 continue;
1987 }
1988 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1989 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1990 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1991 continue;
1992 }
1993 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1994 // We only deal with the tracking of parts if the log is enabled.
1995 // This is because in principle we should track the size of each of
1996 // the parts and sum it up. Not for now.
1997 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1998 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1999 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
2000 if (!dph) {
2001 insertInputInfo(pi, 2, InputType::Invalid, info.id);
2002 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
2003 continue;
2004 }
2005 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2006 // this is indicating a sequence of payloads following the header
2007 // FIXME: we will probably also set the DataHeader version
2008 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
2009 pi += dh->splitPayloadParts - 1;
2010 } else {
2011 // We can set the type for the next splitPayloadParts
2012 // because we are guaranteed they are all the same.
2013 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
2014 // pair.
2015 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2016 if (finalSplitPayloadIndex > parts.Size()) {
2017 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
2018 insertInputInfo(pi, 0, InputType::Invalid, info.id);
2019 continue;
2020 }
2021 insertInputInfo(pi, 2, InputType::Data, info.id);
2022 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
2023 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
2024 }
2025 }
2026 }
2027 if (results.size() + nTotalPayloads != parts.Size()) {
2028 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
2029 return std::nullopt;
2030 }
2031 return results;
2032 };
2033
2034 auto reportError = [ref](const char* message) {
2035 auto& stats = ref.get<DataProcessingStats>();
2037 };
2038
2039 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
2040 auto& relayer = ref.get<DataRelayer>();
2041 auto& state = ref.get<DeviceState>();
2042 static WaitBackpressurePolicy policy;
2043 auto& parts = info.parts;
2044 // We relay execution to make sure we have a complete set of parts
2045 // available.
2046 bool hasBackpressure = false;
2047 size_t minBackpressureTimeslice = -1;
2048 bool hasData = false;
2049 size_t oldestPossibleTimeslice = -1;
2050 static std::vector<int> ordering;
2051 // Same as inputInfos but with iota.
2052 ordering.resize(inputInfos.size());
2053 std::iota(ordering.begin(), ordering.end(), 0);
2054 // stable sort orderings by type and position
2055 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
2056 auto const& ai = inputInfos[a];
2057 auto const& bi = inputInfos[b];
2058 if (ai.type != bi.type) {
2059 return ai.type < bi.type;
2060 }
2061 return ai.position < bi.position;
2062 });
2063 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
2064 auto const& input = inputInfos[ordering[ii]];
2065 switch (input.type) {
2066 case InputType::Data: {
2067 hasData = true;
2068 auto headerIndex = input.position;
2069 auto nMessages = 0;
2070 auto nPayloadsPerHeader = 0;
2071 if (input.size > 2) {
2072 // header and multiple payload sequence
2073 nMessages = input.size;
2074 nPayloadsPerHeader = nMessages - 1;
2075 } else {
2076 // multiple header-payload pairs
2077 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2078 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2079 nPayloadsPerHeader = 1;
2080 ii += (nMessages / 2) - 1;
2081 }
2082 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
2083 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
2084 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2085 slot.index, oldestOutputInfo.timeslice.value);
2086 ref.get<AsyncQueue>();
2087 ref.get<DecongestionService>();
2088 ref.get<DataRelayer>();
2089 // Get the current timeslice for the slot.
2090 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
2092 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
2093 };
2094 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2095 &parts.At(headerIndex),
2096 input,
2097 nMessages,
2098 nPayloadsPerHeader,
2099 onDrop);
2100 switch (relayed.type) {
2102 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
2103 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
2104 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2105 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2106 info.backpressureNotified = true;
2107 info.normalOpsNotified = false;
2108 }
2109 policy.backpressure(info);
2110 hasBackpressure = true;
2111 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2112 break;
2116 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2117 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2118 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2119 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2120 info.normalOpsNotified = true;
2121 info.backpressureNotified = false;
2122 }
2123 break;
2124 }
2125 } break;
2126 case InputType::SourceInfo: {
2127 LOGP(detail, "Received SourceInfo");
2128 auto& context = ref.get<DataProcessorContext>();
2129 state.lastActiveDataProcessor = &context;
2130 auto headerIndex = input.position;
2131 auto payloadIndex = input.position + 1;
2132 assert(payloadIndex < parts.Size());
2133 // FIXME: the message with the end of stream cannot contain
2134 // split parts.
2135 parts.At(headerIndex).reset(nullptr);
2136 parts.At(payloadIndex).reset(nullptr);
2137 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2138 // parts.At(headerIndex + 1 + i).reset(nullptr);
2139 // }
2140 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2141
2142 } break;
2143 case InputType::DomainInfo: {
2146 auto& context = ref.get<DataProcessorContext>();
2147 state.lastActiveDataProcessor = &context;
2148 auto headerIndex = input.position;
2149 auto payloadIndex = input.position + 1;
2150 assert(payloadIndex < parts.Size());
2151 // FIXME: the message with the end of stream cannot contain
2152 // split parts.
2153
2154 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2155 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2156 break;
2157 }
2158 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2159 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2160 parts.At(headerIndex).reset(nullptr);
2161 parts.At(payloadIndex).reset(nullptr);
2162 } break;
2163 case InputType::Invalid: {
2164 reportError("Invalid part found.");
2165 } break;
2166 }
2167 }
2170 if (oldestPossibleTimeslice != (size_t)-1) {
2171 info.oldestForChannel = {oldestPossibleTimeslice};
2172 auto& context = ref.get<DataProcessorContext>();
2173 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2174 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2175 state.lastActiveDataProcessor = &context;
2176 }
2177 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2178 parts.fParts.erase(it, parts.end());
2179 if (parts.fParts.size()) {
2180 LOG(debug) << parts.fParts.size() << " messages backpressured";
2181 }
2182 };
2183
2184 // Second part. This is the actual outer loop we want to obtain, with
2185 // implementation details which can be read. Notice how most of the state
2186 // is actually hidden. For example we do not expose what "input" is. This
2187 // will allow us to keep the same toplevel logic even if the actual meaning
2188 // of input is changed (for example we might move away from multipart
2189 // messages). Notice also that we need to act diffently depending on the
2190 // actual CompletionOp we want to perform. In particular forwarding inputs
2191 // also gets rid of them from the cache.
2192 auto inputTypes = getInputTypes();
2193 if (bool(inputTypes) == false) {
2194 reportError("Parts should come in couples. Dropping it.");
2195 return;
2196 }
2197 handleValidMessages(*inputTypes);
2198 return;
2199}
2200
2201namespace
2202{
2203struct InputLatency {
2204 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2205 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2206};
2207
2208auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2209{
2210 InputLatency result;
2211
2212 for (auto& item : record) {
2213 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2214 if (header == nullptr) {
2215 continue;
2216 }
2217 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2218 if (partLatency < 0) {
2219 partLatency = 0;
2220 }
2221 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2222 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2223 }
2224 return result;
2225};
2226
2227auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2228{
2229 size_t totalInputSize = 0;
2230 for (auto& item : record) {
2231 auto* header = o2::header::get<DataHeader*>(item.header);
2232 if (header == nullptr) {
2233 continue;
2234 }
2235 totalInputSize += header->payloadSize;
2236 }
2237 return totalInputSize;
2238};
2239
2240template <typename T>
2241void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2242{
2243 T prev_value = maximum_value;
2244 while (prev_value < value &&
2245 !maximum_value.compare_exchange_weak(prev_value, value)) {
2246 }
2247}
2248} // namespace
2249
2250bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2251{
2252 auto& context = ref.get<DataProcessorContext>();
2253 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2254 // This is the actual hidden state for the outer loop. In case we decide we
2255 // want to support multithreaded dispatching of operations, I can simply
2256 // move these to some thread local store and the rest of the lambdas
2257 // should work just fine.
2258 std::vector<MessageSet> currentSetOfInputs;
2259
2260 //
2261 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2262 auto& relayer = ref.get<DataRelayer>();
2263 if (consume) {
2264 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2265 } else {
2266 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2267 }
2268 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2269 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2270 const char* headerptr = nullptr;
2271 const char* payloadptr = nullptr;
2272 size_t payloadSize = 0;
2273 // - each input can have multiple parts
2274 // - "part" denotes a sequence of messages belonging together, the first message of the
2275 // sequence is the header message
2276 // - each part has one or more payload messages
2277 // - InputRecord provides all payloads as header-payload pair
2278 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2279 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2280 headerptr = static_cast<char const*>(headerMsg->GetData());
2281 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2282 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2283 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2284 }
2285 return DataRef{};
2286 };
2287 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2288 return currentSetOfInputs[i].getNumberOfPairs();
2289 };
2290#if __has_include(<fairmq/shmem/Message.h>)
2291 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2292 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2293 return header.GetRefCount();
2294 };
2295#else
2296 std::function<int(size_t)> refCountGetter = nullptr;
2297#endif
2298 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2299 };
2300
2301 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2302 auto& relayer = ref.get<DataRelayer>();
2304 };
2305
2306 // I need a preparation step which gets the current timeslice id and
2307 // propagates it to the various contextes (i.e. the actual entities which
2308 // create messages) because the messages need to have the timeslice id into
2309 // it.
2310 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2311 auto& relayer = ref.get<DataRelayer>();
2312 auto& timingInfo = ref.get<TimingInfo>();
2313 auto timeslice = relayer.getTimesliceForSlot(i);
2314
2315 timingInfo.timeslice = timeslice.value;
2316 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2317 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2318 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2319 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2320 };
2321 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2322 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2323 auto& relayer = ref.get<DataRelayer>();
2324 auto& timingInfo = ref.get<TimingInfo>();
2325 auto timeslice = relayer.getTimesliceForSlot(i);
2326 // We report wether or not this timing info refers to a new Run.
2327 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2328 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2329 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2330 // FIXME: for now there is only one stream, however we
2331 // should calculate this correctly once we finally get the
2332 // the StreamContext in.
2333 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2334 };
2335
2336 // When processing them, timers will have to be cleaned up
2337 // to avoid double counting them.
2338 // This was actually the easiest solution we could find for
2339 // O2-646.
2340 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2341 assert(record.size() == currentSetOfInputs.size());
2342 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2343 // assuming that for timer inputs we do have exactly one PartRef object
2344 // in the MessageSet, multiple PartRef Objects are only possible for either
2345 // split payload messages of wildcard matchers, both for data inputs
2346 DataRef input = record.getByPos(ii);
2347 if (input.spec->lifetime != Lifetime::Timer) {
2348 continue;
2349 }
2350 if (input.header == nullptr) {
2351 continue;
2352 }
2353 // This will hopefully delete the message.
2354 currentSetOfInputs[ii].clear();
2355 }
2356 };
2357
2358 // Function to cleanup record. For the moment we
2359 // simply use it to keep track of input messages
2360 // which are not needed, to display them in the GUI.
2361 auto cleanupRecord = [](InputRecord& record) {
2362 if (O2_LOG_ENABLED(parts) == false) {
2363 return;
2364 }
2365 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2366 DataRef input = record.getByPos(pi);
2367 if (input.header == nullptr) {
2368 continue;
2369 }
2370 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2371 if (sih) {
2372 continue;
2373 }
2374
2375 auto dh = o2::header::get<DataHeader*>(input.header);
2376 if (!dh) {
2377 continue;
2378 }
2379 // We use the address of the first header of a split payload
2380 // to identify the interval.
2381 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2382 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2383
2384 // No split parts, we simply skip the payload
2385 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2386 // this is indicating a sequence of payloads following the header
2387 // FIXME: we will probably also set the DataHeader version
2388 pi += dh->splitPayloadParts - 1;
2389 } else {
2390 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2391 }
2392 }
2393 };
2394
2395 ref.get<DataRelayer>().getReadyToProcess(completed);
2396 if (completed.empty() == true) {
2397 LOGP(debug, "No computations available for dispatching.");
2398 return false;
2399 }
2400
2401 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2402 auto& stats = ref.get<DataProcessingStats>();
2403 auto& states = ref.get<DataProcessingStates>();
2404 std::atomic_thread_fence(std::memory_order_release);
2405 char relayerSlotState[1024];
2406 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2407 char* buffer = relayerSlotState + written;
2408 for (size_t ai = 0; ai != record.size(); ai++) {
2409 buffer[ai] = record.isValid(ai) ? '3' : '0';
2410 }
2411 buffer[record.size()] = 0;
2412 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2413 .size = (int)(record.size() + buffer - relayerSlotState),
2414 .data = relayerSlotState});
2415 uint64_t tEnd = uv_hrtime();
2416 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2417 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2419 // Sum up the total wall time, in milliseconds.
2421 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2423 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2424 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2425 auto latency = calculateInputRecordLatency(record, tStartMilli);
2426 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2427 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2428 static int count = 0;
2430 count++;
2431 };
2432
2433 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2434 auto& states = ref.get<DataProcessingStates>();
2435 std::atomic_thread_fence(std::memory_order_release);
2436 char relayerSlotState[1024];
2437 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2438 char* buffer = strchr(relayerSlotState, ' ') + 1;
2439 for (size_t ai = 0; ai != record.size(); ai++) {
2440 buffer[ai] = record.isValid(ai) ? '2' : '0';
2441 }
2442 buffer[record.size()] = 0;
2443 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2444 };
2445
2446 // This is the main dispatching loop
2447 auto& state = ref.get<DeviceState>();
2448 auto& spec = ref.get<DeviceSpec const>();
2449
2450 auto& dpContext = ref.get<DataProcessorContext>();
2451 auto& streamContext = ref.get<StreamContext>();
2452 O2_SIGNPOST_ID_GENERATE(sid, device);
2453 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2454
2455 auto& stats = ref.get<DataProcessingStats>();
2456 auto& relayer = ref.get<DataRelayer>();
2457 using namespace o2::framework;
2458 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2459 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2460 switch (spec.completionPolicy.order) {
2462 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2463 break;
2465 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2466 break;
2468 default:
2469 break;
2470 }
2471
2472 for (auto action : completed) {
2473 O2_SIGNPOST_ID_GENERATE(aid, device);
2474 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2476 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2477 continue;
2478 }
2479
2480 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2482 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2486 updateRunInformation(TimesliceSlot{action.slot});
2487 }
2488 InputSpan span = getInputSpan(action.slot, shouldConsume);
2489 auto& spec = ref.get<DeviceSpec const>();
2490 InputRecord record{spec.inputs,
2491 span,
2492 *context.registry};
2493 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2494 {
2495 // Notice this should be thread safe and reentrant
2496 // as it is called from many threads.
2497 streamContext.preProcessingCallbacks(processContext);
2498 dpContext.preProcessingCallbacks(processContext);
2499 }
2501 context.postDispatchingCallbacks(processContext);
2502 if (spec.forwards.empty() == false) {
2503 auto& timesliceIndex = ref.get<TimesliceIndex>();
2504 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2505 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2506 continue;
2507 }
2508 }
2509 // If there is no optional inputs we canForwardEarly
2510 // the messages to that parallel processing can happen.
2511 // In this case we pass true to indicate that we want to
2512 // copy the messages to the subsequent data processor.
2513 bool hasForwards = spec.forwards.empty() == false;
2515
2516 if (context.canForwardEarly && hasForwards && consumeSomething) {
2517 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2518 auto& timesliceIndex = ref.get<TimesliceIndex>();
2519 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2520 }
2521 markInputsAsDone(action.slot);
2522
2523 uint64_t tStart = uv_hrtime();
2524 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2525 preUpdateStats(action, record, tStart);
2526
2527 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2528
2529 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2530 auto& state = ref.get<DeviceState>();
2531 auto& spec = ref.get<DeviceSpec const>();
2532 auto& streamContext = ref.get<StreamContext>();
2533 auto& dpContext = ref.get<DataProcessorContext>();
2534 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2535 switch (action.op) {
2540 return true;
2541 break;
2542 default:
2543 return false;
2544 }
2545 };
2546 if (state.quitRequested == false) {
2547 {
2548 // Callbacks from services
2549 dpContext.preProcessingCallbacks(processContext);
2550 streamContext.preProcessingCallbacks(processContext);
2551 dpContext.preProcessingCallbacks(processContext);
2552 // Callbacks from users
2553 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2554 }
2555 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2556 if (context.statefulProcess && shouldProcess(action)) {
2557 // This way, usercode can use the the same processing context to identify
2558 // its signposts and we can map user code to device iterations.
2559 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2560 (context.statefulProcess)(processContext);
2561 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2562 } else if (context.statelessProcess && shouldProcess(action)) {
2563 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2564 (context.statelessProcess)(processContext);
2565 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2566 } else if (context.statelessProcess || context.statefulProcess) {
2567 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2568 } else {
2569 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2571 }
2572 if (shouldProcess(action)) {
2573 auto& timingInfo = ref.get<TimingInfo>();
2574 if (timingInfo.globalRunNumberChanged) {
2575 context.lastRunNumberProcessed = timingInfo.runNumber;
2576 }
2577 }
2578
2579 // Notify the sink we just consumed some timeframe data
2580 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2581 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2582 auto& allocator = ref.get<DataAllocator>();
2583 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2584 }
2585
2586 // Extra callback which allows a service to add extra outputs.
2587 // This is needed e.g. to ensure that injected CCDB outputs are added
2588 // before an end of stream.
2589 {
2590 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2591 dpContext.finaliseOutputsCallbacks(processContext);
2592 streamContext.finaliseOutputsCallbacks(processContext);
2593 }
2594
2595 {
2596 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2597 dpContext.postProcessingCallbacks(processContext);
2598 streamContext.postProcessingCallbacks(processContext);
2599 }
2600 }
2601 };
2602
2603 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2604 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2605 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2606 }
2607 if (noCatch) {
2608 try {
2609 runNoCatch(action);
2610 } catch (o2::framework::RuntimeErrorRef e) {
2611 (context.errorHandling)(e, record);
2612 }
2613 } else {
2614 try {
2615 runNoCatch(action);
2616 } catch (std::exception& ex) {
2620 auto e = runtime_error(ex.what());
2621 (context.errorHandling)(e, record);
2622 } catch (o2::framework::RuntimeErrorRef e) {
2623 (context.errorHandling)(e, record);
2624 }
2625 }
2626 if (state.severityStack.empty() == false) {
2627 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2628 state.severityStack.pop_back();
2629 }
2630
2631 postUpdateStats(action, record, tStart, tStartMilli);
2632 // We forward inputs only when we consume them. If we simply Process them,
2633 // we keep them for next message arriving.
2635 cleanupRecord(record);
2636 context.postDispatchingCallbacks(processContext);
2637 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2638 }
2639 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2640 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2641 auto& timesliceIndex = ref.get<TimesliceIndex>();
2642 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2643 }
2644 context.postForwardingCallbacks(processContext);
2646 cleanTimers(action.slot, record);
2647 }
2648 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2649 }
2650 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2651
2652 // We now broadcast the end of stream if it was requested
2653 if (state.streaming == StreamingState::EndOfStreaming) {
2654 LOGP(detail, "Broadcasting end of stream");
2655 for (auto& channel : spec.outputChannels) {
2657 }
2659 }
2660
2661 return true;
2662}
2663
2665{
2666 LOG(error) << msg;
2667 ServiceRegistryRef ref{mServiceRegistry};
2668 auto& stats = ref.get<DataProcessingStats>();
2670}
2671
2672std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2673{
2674
2675 if (registry.active<ConfigurationInterface>()) {
2676 auto& cfg = registry.get<ConfigurationInterface>();
2677 try {
2678 cfg.getRecursive(name);
2679 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2680 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2681 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2682 configStore->preload();
2683 configStore->activate();
2684 return configStore;
2685 } catch (...) {
2686 // No overrides...
2687 }
2688 }
2689 return {nullptr};
2690}
2691
2692} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:599
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
virtual fair::mq::Device * device()=0
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
auto switchState(ServiceRegistryRef &ref, StreamingState newState) -> void
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
TransitionHandlingState updateStateTransition(ServiceRegistryRef &ref, ProcessingPolicies const &policies)
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:619
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153