Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
31#include "Framework/InputSpan.h"
32#if defined(__APPLE__) || defined(NDEBUG)
33#define O2_SIGNPOST_IMPLEMENTATION
34#endif
35#include "Framework/Signpost.h"
48
49#include "DecongestionService.h"
51#include "DataRelayerHelpers.h"
52#include "Headers/DataHeader.h"
54
55#include <Framework/Tracing.h>
56
57#include <fairmq/Parts.h>
58#include <fairmq/Socket.h>
59#include <fairmq/ProgOptions.h>
60#if __has_include(<fairmq/shmem/Message.h>)
61#include <fairmq/shmem/Message.h>
62#endif
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// Special log to keep track of the lifetime of the parts
91// Stream which keeps track of the calibration lifetime logic
93// Special log to track the async queue behavior
95// Special log to track the forwarding requests
97
98using namespace o2::framework;
99using ConfigurationInterface = o2::configuration::ConfigurationInterface;
101
102constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
103
104namespace o2::framework
105{
106
107template <>
111
115{
116 auto* state = (DeviceState*)handle->data;
117 state->loopReason |= DeviceState::TIMER_EXPIRED;
118}
119
120bool hasOnlyTimers(DeviceSpec const& spec)
121{
122 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
123}
124
126{
127 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
128}
129
131{
132 auto* ref = (ServiceRegistryRef*)handle->data;
133 auto& state = ref->get<DeviceState>();
134 state.loopReason |= DeviceState::TIMER_EXPIRED;
135 // Check if this is a source device
136 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
137 auto& spec = ref->get<DeviceSpec const>();
138 if (hasOnlyGenerated(spec)) {
139 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
140 } else {
141 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.",
142 state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration");
143 }
144 state.transitionHandling = TransitionHandlingState::Expired;
145}
146
148{
149 auto& state = ref.get<DeviceState>();
150 auto& context = ref.get<DataProcessorContext>();
151 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
152 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
153 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
154 state.streaming = newState;
155 ref.get<ControlService>().notifyStreamingState(state.streaming);
156};
157
159{
160 auto* ref = (ServiceRegistryRef*)handle->data;
161 auto& state = ref->get<DeviceState>();
162 auto& spec = ref->get<DeviceSpec const>();
163 state.loopReason |= DeviceState::TIMER_EXPIRED;
164
165 // Check if this is a source device
166 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
167
168 if (hasOnlyGenerated(spec)) {
169 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
171 } else {
172 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
173 state.allowedProcessing = DeviceState::CalibrationOnly;
174 }
175}
176
178{
179 auto* state = (DeviceState*)s->data;
181}
182
183DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
184{
185 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
186 return devices[running.index];
187}
188
194
196 : mRunningDevice{running},
197 mConfigRegistry{nullptr},
198 mServiceRegistry{registry},
199 mProcessingPolicies{policies}
200{
201 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
202 if (key == "cleanup") {
204 auto& deviceState = ref.get<DeviceState>();
205 int64_t cleanupCount = deviceState.cleanupCount.load();
206 int64_t newCleanupCount = std::stoll(value);
207 if (newCleanupCount <= cleanupCount) {
208 return;
209 }
210 deviceState.cleanupCount.store(newCleanupCount);
211 for (auto& info : deviceState.inputChannelInfos) {
212 fair::mq::Parts parts;
213 while (info.channel->Receive(parts, 0)) {
214 LOGP(debug, "Dropping {} parts", parts.Size());
215 if (parts.Size() == 0) {
216 break;
217 }
218 }
219 }
220 }
221 });
222
223 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
225 auto& deviceState = ref.get<DeviceState>();
226 auto& control = ref.get<ControlService>();
227 auto& callbacks = ref.get<CallbackService>();
228 control.notifyDeviceState(fair::mq::GetStateName(state));
230
231 if (deviceState.nextFairMQState.empty() == false) {
232 auto state = deviceState.nextFairMQState.back();
233 (void)this->ChangeState(state);
234 deviceState.nextFairMQState.pop_back();
235 }
236 };
237
238 // 99 is to execute DPL callbacks last
239 this->SubscribeToStateChange("99-dpl", stateWatcher);
240
241 // One task for now.
242 mStreams.resize(1);
243 mHandles.resize(1);
244
245 ServiceRegistryRef ref{mServiceRegistry};
246 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
247 auto& state = ref.get<DeviceState>();
248 assert(state.loop);
249 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
250 mAwakeHandle->data = &state;
251 if (res < 0) {
252 LOG(error) << "Unable to initialise subscription";
253 }
254
256 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
257 int res = uv_async_send(wakeHandle);
258 if (res < 0) {
259 LOG(error) << "Unable to notify subscription";
260 }
261 LOG(debug) << "State transition requested";
262 });
263}
264
265// Callback to execute the processing. Notice how the data is
266// is a vector of DataProcessorContext so that we can index the correct
267// one with the thread id. For the moment we simply use the first one.
268void run_callback(uv_work_t* handle)
269{
270 auto* task = (TaskStreamInfo*)handle->data;
271 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
272 // We create a new signpost interval for this specific data processor. Same id, same data processor.
273 auto& dataProcessorContext = ref.get<DataProcessorContext>();
274 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
275 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
278 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
279}
280
281// Once the processing in a thread is done, this is executed on the main thread.
282void run_completion(uv_work_t* handle, int status)
283{
284 auto* task = (TaskStreamInfo*)handle->data;
285 // Notice that the completion, while running on the main thread, still
286 // has a salt which is associated to the actual stream which was doing the computation
287 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
288 auto& state = ref.get<DeviceState>();
289 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
290
291 using o2::monitoring::Metric;
292 using o2::monitoring::Monitoring;
293 using o2::monitoring::tags::Key;
294 using o2::monitoring::tags::Value;
295
296 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
297 auto& dpStats = ref.get<DataProcessingStats>();
298 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
299
300 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
301 dpStats.processCommandQueue();
302 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
303 };
304
305 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
306 auto& dpStats = ref.get<DataProcessingStats>();
307 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
308 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
309 dpStats.processCommandQueue();
310 };
311
312 for (auto& consumer : state.offerConsumers) {
313 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
314 }
315 state.offerConsumers.clear();
316 quotaEvaluator.handleExpired(reportExpiredOffer);
317 quotaEvaluator.dispose(task->id.index);
318 task->running = false;
319}
320
321// Context for polling
323 enum struct PollerState : char { Stopped,
325 Connected,
326 Suspended };
327 char const* name = nullptr;
328 uv_loop_t* loop = nullptr;
330 DeviceState* state = nullptr;
331 fair::mq::Socket* socket = nullptr;
333 int fd = -1;
334 bool read = true;
336};
337
338void on_socket_polled(uv_poll_t* poller, int status, int events)
339{
340 auto* context = (PollerContext*)poller->data;
341 assert(context);
342 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
343 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
344 switch (events) {
345 case UV_READABLE: {
346 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
347 context->state->loopReason |= DeviceState::DATA_INCOMING;
348 } break;
349 case UV_WRITABLE: {
350 O2_SIGNPOST_END(device, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
351 if (context->read) {
352 O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
353 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
354 context->state->loopReason |= DeviceState::DATA_CONNECTED;
355 } else {
356 O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
357 context->state->loopReason |= DeviceState::DATA_OUTGOING;
358 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
359 // just wait for the disconnect.
360 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
361 }
362 context->pollerState = PollerContext::PollerState::Connected;
363 } break;
364 case UV_DISCONNECT: {
365 O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
366 } break;
367 case UV_PRIORITIZED: {
368 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
369 } break;
370 }
371 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
372}
373
374void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
375{
376 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
377 auto* context = (PollerContext*)poller->data;
378 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
379 if (status < 0) {
380 LOGP(fatal, "Error while polling {}: {}", context->name, status);
381 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
382 }
383 switch (events) {
384 case UV_READABLE: {
385 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
386 context->state->loopReason |= DeviceState::DATA_INCOMING;
387 assert(context->channelInfo);
388 context->channelInfo->readPolled = true;
389 } break;
390 case UV_WRITABLE: {
391 O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
392 if (context->read) {
393 O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
394 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
395 } else {
396 O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
397 context->state->loopReason |= DeviceState::DATA_OUTGOING;
398 }
399 } break;
400 case UV_DISCONNECT: {
401 O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
402 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
403 } break;
404 case UV_PRIORITIZED: {
405 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
406 } break;
407 }
408 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
409}
410
419{
420 auto ref = ServiceRegistryRef{mServiceRegistry};
421 auto& context = ref.get<DataProcessorContext>();
422 auto& spec = getRunningDevice(mRunningDevice, ref);
423
424 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
425 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
426 context.statelessProcess = spec.algorithm.onProcess;
427 context.statefulProcess = nullptr;
428 context.error = spec.algorithm.onError;
429 context.initError = spec.algorithm.onInitError;
430
431 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
432 if (configStore == nullptr) {
433 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
434 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
435 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
436 configStore->preload();
437 configStore->activate();
438 }
439
440 using boost::property_tree::ptree;
441
443 for (auto& entry : configStore->store()) {
444 std::stringstream ss;
445 std::string str;
446 if (entry.second.empty() == false) {
447 boost::property_tree::json_parser::write_json(ss, entry.second, false);
448 str = ss.str();
449 str.pop_back(); // remove EoL
450 } else {
451 str = entry.second.get_value<std::string>();
452 }
453 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
454 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
455 }
456
457 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
458
459 // Setup the error handlers for init
460 if (context.initError) {
461 context.initErrorHandling = [&errorCallback = context.initError,
462 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
466 auto& context = ref.get<DataProcessorContext>();
467 auto& err = error_from_ref(e);
468 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
469 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
470 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
471 auto& stats = ref.get<DataProcessingStats>();
473 InitErrorContext errorContext{ref, e};
474 errorCallback(errorContext);
475 };
476 } else {
477 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
478 auto& err = error_from_ref(e);
482 auto& context = ref.get<DataProcessorContext>();
483 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
484 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
485 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
486 auto& stats = ref.get<DataProcessingStats>();
488 exit(1);
489 };
490 }
491
492 context.expirationHandlers.clear();
493 context.init = spec.algorithm.onInit;
494 if (context.init) {
495 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
496 InitContext initContext{*mConfigRegistry, mServiceRegistry};
497
498 if (noCatch) {
499 try {
500 context.statefulProcess = context.init(initContext);
502 if (context.initErrorHandling) {
503 (context.initErrorHandling)(e);
504 }
505 }
506 } else {
507 try {
508 context.statefulProcess = context.init(initContext);
509 } catch (std::exception& ex) {
513 auto e = runtime_error(ex.what());
514 (context.initErrorHandling)(e);
516 (context.initErrorHandling)(e);
517 }
518 }
519 }
520 auto& state = ref.get<DeviceState>();
521 state.inputChannelInfos.resize(spec.inputChannels.size());
525 int validChannelId = 0;
526 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
527 auto& name = spec.inputChannels[ci].name;
528 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
529 state.inputChannelInfos[ci].state = InputChannelState::Pull;
530 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
531 validChannelId++;
532 } else {
533 state.inputChannelInfos[ci].id = {validChannelId++};
534 }
535 }
536
537 // Invoke the callback policy for this device.
538 if (spec.callbacksPolicy.policy != nullptr) {
539 InitContext initContext{*mConfigRegistry, mServiceRegistry};
540 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
541 }
542
543 // Services which are stream should be initialised now
544 auto* options = GetConfig();
545 for (size_t si = 0; si < mStreams.size(); ++si) {
547 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
548 }
549 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
550}
551
552void on_signal_callback(uv_signal_t* handle, int signum)
553{
554 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
555 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
556
557 auto* registry = (ServiceRegistry*)handle->data;
558 if (!registry) {
559 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
560 return;
561 }
562 ServiceRegistryRef ref{*registry};
563 auto& state = ref.get<DeviceState>();
564 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
565 auto& stats = ref.get<DataProcessingStats>();
567 size_t ri = 0;
568 while (ri != quotaEvaluator.mOffers.size()) {
569 auto& offer = quotaEvaluator.mOffers[ri];
570 // We were already offered some sharedMemory, so we
571 // do not consider the offer.
572 // FIXME: in principle this should account for memory
573 // available and being offered, however we
574 // want to get out of the woods for now.
575 if (offer.valid && offer.sharedMemory != 0) {
576 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
577 return;
578 }
579 ri++;
580 }
581 // Find the first empty offer and have 1GB of shared memory there
582 for (auto& offer : quotaEvaluator.mOffers) {
583 if (offer.valid == false) {
584 offer.cpu = 0;
585 offer.memory = 0;
586 offer.sharedMemory = 1000000000;
587 offer.valid = true;
588 offer.user = -1;
589 break;
590 }
591 }
593 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
594}
595
596static auto toBeForwardedHeader = [](void* header) -> bool {
597 // If is now possible that the record is not complete when
598 // we forward it, because of a custom completion policy.
599 // this means that we need to skip the empty entries in the
600 // record for being forwarded.
601 if (header == nullptr) {
602 return false;
603 }
604 auto sih = o2::header::get<SourceInfoHeader*>(header);
605 if (sih) {
606 return false;
607 }
608
609 auto dih = o2::header::get<DomainInfoHeader*>(header);
610 if (dih) {
611 return false;
612 }
613
614 auto dh = o2::header::get<DataHeader*>(header);
615 if (!dh) {
616 return false;
617 }
618 auto dph = o2::header::get<DataProcessingHeader*>(header);
619 if (!dph) {
620 return false;
621 }
622 return true;
623};
624
625static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
626 FairMQDeviceProxy& proxy,
627 std::unique_ptr<fair::mq::Message>& header,
628 std::unique_ptr<fair::mq::Message>& payload,
629 size_t total,
630 bool consume) {
631 if (header.get() == nullptr) {
632 // Missing an header is not an error anymore.
633 // it simply means that we did not receive the
634 // given input, but we were asked to
635 // consume existing, so we skip it.
636 return false;
637 }
638 if (payload.get() == nullptr && consume == true) {
639 // If the payload is not there, it means we already
640 // processed it with ConsumeExisiting. Therefore we
641 // need to do something only if this is the last consume.
642 header.reset(nullptr);
643 return false;
644 }
645
646 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
647 if (fdph == nullptr) {
648 LOG(error) << "Data is missing DataProcessingHeader";
649 return false;
650 }
651 auto fdh = o2::header::get<DataHeader*>(header->GetData());
652 if (fdh == nullptr) {
653 LOG(error) << "Data is missing DataHeader";
654 return false;
655 }
656
657 // We need to find the forward route only for the first
658 // part of a split payload. All the others will use the same.
659 // but always check if we have a sequence of multiple payloads
660 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
661 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
662 }
663 return cachedForwardingChoices.empty() == false;
664};
665
666struct DecongestionContext {
669};
670
671auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
672 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
673 auto& ref = task.user<DecongestionContext>().ref;
674
675 auto& decongestion = ref.get<DecongestionService>();
676 auto& proxy = ref.get<FairMQDeviceProxy>();
677 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
678 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
679 return;
680 }
681 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
682 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
683 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
684 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
685 // TODO: this we could cache in the proxy at the bind moment.
686 if (info.channelType != ChannelAccountingType::DPL) {
687 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
688 info.name.c_str());
689
690 continue;
691 }
692 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
693 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
694 info.name.c_str(), oldestTimeslice.timeslice.value);
695 }
696 }
697};
698
699// This is how we do the forwarding, i.e. we push
700// the inputs which are shared between this device and others
701// to the next one in the daisy chain.
702// FIXME: do it in a smarter way than O(N^2)
703static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
704 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
705 auto& proxy = registry.get<FairMQDeviceProxy>();
706 // we collect all messages per forward in a map and send them together
707 std::vector<fair::mq::Parts> forwardedParts;
708 forwardedParts.resize(proxy.getNumForwards());
709 std::vector<ChannelIndex> cachedForwardingChoices{};
710 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
711 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
712 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
713
714 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
715 auto& messageSet = currentSetOfInputs[ii];
716 // In case the messageSet is empty, there is nothing to be done.
717 if (messageSet.size() == 0) {
718 continue;
719 }
720 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
721 continue;
722 }
723 cachedForwardingChoices.clear();
724
725 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
726 auto& messageSet = currentSetOfInputs[ii];
727 auto& header = messageSet.header(pi);
728 auto& payload = messageSet.payload(pi);
729 auto total = messageSet.getNumberOfPayloads(pi);
730
731 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
732 continue;
733 }
734
735 // In case of more than one forward route, we need to copy the message.
736 // This will eventually use the same mamory if running with the same backend.
737 if (cachedForwardingChoices.size() > 1) {
738 copy = true;
739 }
740 auto* dh = o2::header::get<DataHeader*>(header->GetData());
741 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
742
743 if (copy) {
744 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
745 auto&& newHeader = header->GetTransport()->CreateMessage();
746 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
747 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
748 newHeader->Copy(*header);
749 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
750
751 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
752 auto&& newPayload = header->GetTransport()->CreateMessage();
753 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
754 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
755 }
756 }
757 } else {
758 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
759 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
760 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
761 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
762 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
763 }
764 }
765 }
766 }
767 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
768 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
769 if (forwardedParts[fi].Size() == 0) {
770 continue;
771 }
772 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
773 auto& parts = forwardedParts[fi];
774 if (info.policy == nullptr) {
775 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
776 continue;
777 }
778 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
779 info.policy->forward(parts, ChannelIndex{fi}, registry);
780 }
781
782 auto& asyncQueue = registry.get<AsyncQueue>();
783 auto& decongestion = registry.get<DecongestionService>();
784 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
785 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
786 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
787 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
788 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
789};
790
791extern volatile int region_read_global_dummy_variable;
793
795void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
796{
797 if (infos.empty() == false) {
798 std::vector<fair::mq::RegionInfo> toBeNotified;
799 toBeNotified.swap(infos); // avoid any MT issue.
800 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
801 for (auto const& info : toBeNotified) {
802 if (dummyRead) {
803 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
804 region_read_global_dummy_variable = ((int*)info.ptr)[i];
805 }
806 }
807 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
808 }
809 }
810}
811
812namespace
813{
815{
816 auto* state = (DeviceState*)handle->data;
818}
819} // namespace
820
821void DataProcessingDevice::initPollers()
822{
823 auto ref = ServiceRegistryRef{mServiceRegistry};
824 auto& deviceContext = ref.get<DeviceContext>();
825 auto& context = ref.get<DataProcessorContext>();
826 auto& spec = ref.get<DeviceSpec const>();
827 auto& state = ref.get<DeviceState>();
828 // We add a timer only in case a channel poller is not there.
829 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
830 for (auto& [channelName, channel] : GetChannels()) {
831 InputChannelInfo* channelInfo;
832 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
833 auto& channelSpec = spec.inputChannels[ci];
834 channelInfo = &state.inputChannelInfos[ci];
835 if (channelSpec.name != channelName) {
836 continue;
837 }
838 channelInfo->channel = &this->GetChannel(channelName, 0);
839 break;
840 }
841 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
842 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
843 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
844 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
845 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
846 continue;
847 }
848 // We only watch receiving sockets.
849 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
850 LOGP(detail, "{} is to send data. Not polling.", channelName);
851 continue;
852 }
853
854 if (channelName.rfind("from_", 0) != 0) {
855 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
856 continue;
857 }
858
859 // We assume there is always a ZeroMQ socket behind.
860 int zmq_fd = 0;
861 size_t zmq_fd_len = sizeof(zmq_fd);
862 // FIXME: I should probably save those somewhere... ;-)
863 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
864 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
865 if (zmq_fd == 0) {
866 LOG(error) << "Cannot get file descriptor for channel." << channelName;
867 continue;
868 }
869 LOGP(detail, "Polling socket for {}", channelName);
870 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
871 pCtx->name = strdup(channelName.c_str());
872 pCtx->loop = state.loop;
873 pCtx->device = this;
874 pCtx->state = &state;
875 pCtx->fd = zmq_fd;
876 assert(channelInfo != nullptr);
877 pCtx->channelInfo = channelInfo;
878 pCtx->socket = &channel[0].GetSocket();
879 pCtx->read = true;
880 poller->data = pCtx;
881 uv_poll_init(state.loop, poller, zmq_fd);
882 if (channelName.rfind("from_", 0) != 0) {
883 LOGP(detail, "{} is an out of band channel.", channelName);
884 state.activeOutOfBandPollers.push_back(poller);
885 } else {
886 channelInfo->pollerIndex = state.activeInputPollers.size();
887 state.activeInputPollers.push_back(poller);
888 }
889 }
890 // In case we do not have any input channel and we do not have
891 // any timers or signal watchers we still wake up whenever we can send data to downstream
892 // devices to allow for enumerations.
893 if (state.activeInputPollers.empty() &&
894 state.activeOutOfBandPollers.empty() &&
895 state.activeTimers.empty() &&
896 state.activeSignals.empty()) {
897 // FIXME: this is to make sure we do not reset the output timer
898 // for readout proxies or similar. In principle this should go once
899 // we move to OutOfBand InputSpec.
900 if (state.inputChannelInfos.empty()) {
901 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
902 deviceContext.exitTransitionTimeout = 0;
903 }
904 for (auto& [channelName, channel] : GetChannels()) {
905 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
906 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
907 continue;
908 }
909 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
910 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
911 continue;
912 }
913 // We assume there is always a ZeroMQ socket behind.
914 int zmq_fd = 0;
915 size_t zmq_fd_len = sizeof(zmq_fd);
916 // FIXME: I should probably save those somewhere... ;-)
917 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
918 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
919 if (zmq_fd == 0) {
920 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
921 continue;
922 }
923 LOG(detail) << "Polling socket for " << channel[0].GetName();
924 // FIXME: leak
925 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
926 pCtx->name = strdup(channelName.c_str());
927 pCtx->loop = state.loop;
928 pCtx->device = this;
929 pCtx->state = &state;
930 pCtx->fd = zmq_fd;
931 pCtx->read = false;
932 poller->data = pCtx;
933 uv_poll_init(state.loop, poller, zmq_fd);
934 state.activeOutputPollers.push_back(poller);
935 }
936 }
937 } else {
938 LOGP(detail, "This is a fake device so we exit after the first iteration.");
939 deviceContext.exitTransitionTimeout = 0;
940 // This is a fake device, so we can request to exit immediately
941 ServiceRegistryRef ref{mServiceRegistry};
942 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
943 // A two second timer to stop internal devices which do not want to
944 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
945 uv_timer_init(state.loop, timer);
946 timer->data = &state;
947 uv_update_time(state.loop);
948 uv_timer_start(timer, on_idle_timer, 2000, 2000);
949 state.activeTimers.push_back(timer);
950 }
951}
952
953void DataProcessingDevice::startPollers()
954{
955 auto ref = ServiceRegistryRef{mServiceRegistry};
956 auto& deviceContext = ref.get<DeviceContext>();
957 auto& state = ref.get<DeviceState>();
958
959 for (auto* poller : state.activeInputPollers) {
960 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
961 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
962 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
963 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
964 }
965 for (auto& poller : state.activeOutOfBandPollers) {
966 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
967 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
968 }
969 for (auto* poller : state.activeOutputPollers) {
970 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
971 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
972 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
973 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
974 }
975
976 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
977 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
978 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
979
980 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
981 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
982 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
983}
984
985void DataProcessingDevice::stopPollers()
986{
987 auto ref = ServiceRegistryRef{mServiceRegistry};
988 auto& deviceContext = ref.get<DeviceContext>();
989 auto& state = ref.get<DeviceState>();
990 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
991 for (auto* poller : state.activeInputPollers) {
992 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
993 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
994 uv_poll_stop(poller);
995 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
996 }
997 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
998 for (auto* poller : state.activeOutOfBandPollers) {
999 uv_poll_stop(poller);
1000 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1001 }
1002 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
1003 for (auto* poller : state.activeOutputPollers) {
1004 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
1005 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
1006 uv_poll_stop(poller);
1007 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1008 }
1009
1010 uv_timer_stop(deviceContext.gracePeriodTimer);
1011 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
1012 free(deviceContext.gracePeriodTimer);
1013 deviceContext.gracePeriodTimer = nullptr;
1014
1015 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1016 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1017 free(deviceContext.dataProcessingGracePeriodTimer);
1018 deviceContext.dataProcessingGracePeriodTimer = nullptr;
1019}
1020
1022{
1023 auto ref = ServiceRegistryRef{mServiceRegistry};
1024 auto& deviceContext = ref.get<DeviceContext>();
1025 auto& context = ref.get<DataProcessorContext>();
1026
1027 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1028 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
1029 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
1030 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
1031 auto& state = ref.get<DeviceState>();
1032 int i = 0;
1033 for (auto& di : distinct) {
1034 auto& route = spec.inputs[di];
1035 if (route.configurator.has_value() == false) {
1036 i++;
1037 continue;
1038 }
1039 ExpirationHandler handler{
1040 .name = route.configurator->name,
1041 .routeIndex = RouteIndex{i++},
1042 .lifetime = route.matcher.lifetime,
1043 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1044 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1045 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1046 context.expirationHandlers.emplace_back(std::move(handler));
1047 }
1048
1049 if (state.awakeMainThread == nullptr) {
1050 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1051 state.awakeMainThread->data = &state;
1052 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1053 }
1054
1055 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1056 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1057 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1058
1059 for (auto& channel : GetChannels()) {
1060 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1061 &registry = mServiceRegistry,
1062 &pendingRegionInfos = mPendingRegionInfos,
1063 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1064 std::lock_guard<std::mutex> lock(regionInfoMutex);
1065 LOG(detail) << ">>> Region info event" << info.event;
1066 LOG(detail) << "id: " << info.id;
1067 LOG(detail) << "ptr: " << info.ptr;
1068 LOG(detail) << "size: " << info.size;
1069 LOG(detail) << "flags: " << info.flags;
1070 // Now we check for pending events with the mutex,
1071 // so the lines below are atomic.
1072 pendingRegionInfos.push_back(info);
1073 context.expectedRegionCallbacks -= 1;
1074 // We always want to handle these on the main loop,
1075 // so we awake it.
1076 ServiceRegistryRef ref{registry};
1077 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1078 });
1079 }
1080
1081 // Add a signal manager for SIGUSR1 so that we can force
1082 // an event from the outside, making sure that the event loop can
1083 // be unblocked (e.g. by a quitting DPL driver) even when there
1084 // is no data pending to be processed.
1085 if (deviceContext.sigusr1Handle == nullptr) {
1086 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1087 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1088 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1089 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1090 }
1091 // If there is any signal, we want to make sure they are active
1092 for (auto& handle : state.activeSignals) {
1093 handle->data = &state;
1094 }
1095 // When we start, we must make sure that we do listen to the signal
1096 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1097
1099 DataProcessingDevice::initPollers();
1100
1101 // Whenever we InitTask, we consider as if the previous iteration
1102 // was successful, so that even if there is no timer or receiving
1103 // channel, we can still start an enumeration.
1104 DataProcessorContext* initialContext = nullptr;
1105 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1106 if (!idle) {
1107 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1108 }
1109
1110 // We should be ready to run here. Therefore we copy all the
1111 // required parts in the DataProcessorContext. Eventually we should
1112 // do so on a per thread basis, with fine grained locks.
1113 // FIXME: this should not use ServiceRegistry::threadSalt, but
1114 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1115 // N is the number of the multiplexed data processor.
1116 // We will get there.
1117 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1118
1119 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1120
1121 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1122 std::lock_guard<std::mutex> lock(mutex);
1123 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1124 };
1125 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1130 while (hasPendingEvents(deviceContext)) {
1131 // Wait for the callback to signal its done, so that we do not busy wait.
1132 uv_run(state.loop, UV_RUN_ONCE);
1133 // Handle callbacks if any
1134 {
1135 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1136 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1137 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1138 }
1139 }
1140 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1141}
1142
1144{
1145 context.isSink = false;
1146 // If nothing is a sink, the rate limiting simply does not trigger.
1147 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1148
1149 auto ref = ServiceRegistryRef{mServiceRegistry};
1150 auto& spec = ref.get<DeviceSpec const>();
1151
1152 // The policy is now allowed to state the default.
1153 context.balancingInputs = spec.completionPolicy.balanceChannels;
1154 // This is needed because the internal injected dummy sink should not
1155 // try to balance inputs unless the rate limiting is requested.
1156 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1157 context.balancingInputs = false;
1158 }
1159 if (enableRateLimiting) {
1160 for (auto& spec : spec.outputs) {
1161 if (spec.matcher.binding.value == "dpl-summary") {
1162 context.isSink = true;
1163 break;
1164 }
1165 }
1166 }
1167
1168 context.registry = &mServiceRegistry;
1171 if (context.error != nullptr) {
1172 context.errorHandling = [&errorCallback = context.error,
1173 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1177 auto& err = error_from_ref(e);
1178 auto& context = ref.get<DataProcessorContext>();
1179 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1180 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1181 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1182 auto& stats = ref.get<DataProcessingStats>();
1184 ErrorContext errorContext{record, ref, e};
1185 errorCallback(errorContext);
1186 };
1187 } else {
1188 context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
1189 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1190 auto& err = error_from_ref(e);
1194 auto& context = ref.get<DataProcessorContext>();
1195 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1196 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1197 auto& stats = ref.get<DataProcessingStats>();
1199 switch (errorPolicy) {
1201 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1202 throw e;
1203 default:
1204 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1205 break;
1206 }
1207 };
1208 }
1209
1210 auto decideEarlyForward = [&context, &spec, this]() -> bool {
1213 bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1214 bool onlyConditions = true;
1215 bool overriddenEarlyForward = false;
1216 for (auto& forwarded : spec.forwards) {
1217 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1218 onlyConditions = false;
1219 }
1220#if !__has_include(<fairmq/shmem/Message.h>)
1221 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1222 context.canForwardEarly = false;
1223 overriddenEarlyForward = true;
1224 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1225 break;
1226 }
1227#endif
1228 if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1229 context.canForwardEarly = false;
1230 overriddenEarlyForward = true;
1231 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1232 break;
1233 }
1234 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1235 context.canForwardEarly = false;
1236 overriddenEarlyForward = true;
1237 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1238 break;
1239 }
1240 }
1241 if (!overriddenEarlyForward && onlyConditions) {
1242 context.canForwardEarly = true;
1243 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1244 }
1245 return canForwardEarly;
1246 };
1247 context.canForwardEarly = decideEarlyForward();
1248}
1249
1251{
1252 auto ref = ServiceRegistryRef{mServiceRegistry};
1253 auto& state = ref.get<DeviceState>();
1254
1255 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1256 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1257 state.quitRequested = false;
1259 state.allowedProcessing = DeviceState::Any;
1260 for (auto& info : state.inputChannelInfos) {
1261 if (info.state != InputChannelState::Pull) {
1262 info.state = InputChannelState::Running;
1263 }
1264 }
1265
1266 // Catch callbacks which fail before we start.
1267 // Notice that when running multiple dataprocessors
1268 // we should probably allow expendable ones to fail.
1269 try {
1270 auto& dpContext = ref.get<DataProcessorContext>();
1271 dpContext.preStartCallbacks(ref);
1272 for (size_t i = 0; i < mStreams.size(); ++i) {
1273 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1274 auto& context = streamRef.get<StreamContext>();
1275 context.preStartStreamCallbacks(streamRef);
1276 }
1277 } catch (std::exception& e) {
1278 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1279 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1280 throw;
1281 } catch (o2::framework::RuntimeErrorRef& e) {
1282 auto& err = error_from_ref(e);
1283 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1284 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1285 throw;
1286 } catch (...) {
1287 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1288 throw;
1289 }
1290
1291 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1292 startPollers();
1293
1294 // Raise to 1 when we are ready to start processing
1295 using o2::monitoring::Metric;
1296 using o2::monitoring::Monitoring;
1297 using o2::monitoring::tags::Key;
1298 using o2::monitoring::tags::Value;
1299
1300 auto& monitoring = ref.get<Monitoring>();
1301 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1302 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1303}
1304
1306{
1307 ServiceRegistryRef ref{mServiceRegistry};
1308 // Raise to 1 when we are ready to start processing
1309 using o2::monitoring::Metric;
1310 using o2::monitoring::Monitoring;
1311 using o2::monitoring::tags::Key;
1312 using o2::monitoring::tags::Value;
1313
1314 auto& monitoring = ref.get<Monitoring>();
1315 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1316
1317 stopPollers();
1318 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1319 auto& dpContext = ref.get<DataProcessorContext>();
1320 dpContext.postStopCallbacks(ref);
1321}
1322
1324{
1325 ServiceRegistryRef ref{mServiceRegistry};
1326 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1327}
1328
1330{
1331 ServiceRegistryRef ref{mServiceRegistry};
1332 auto& state = ref.get<DeviceState>();
1334 bool firstLoop = true;
1335 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1336 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1337
1338 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1339 if (dplEnableMultithreding) {
1340 setenv("UV_THREADPOOL_SIZE", "1", 1);
1341 }
1342
1343 while (state.transitionHandling != TransitionHandlingState::Expired) {
1344 if (state.nextFairMQState.empty() == false) {
1345 (void)this->ChangeState(state.nextFairMQState.back());
1346 state.nextFairMQState.pop_back();
1347 }
1348 // Notify on the main thread the new region callbacks, making sure
1349 // no callback is issued if there is something still processing.
1350 {
1351 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1352 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1353 }
1354 // This will block for the correct delay (or until we get data
1355 // on a socket). We also do not block on the first iteration
1356 // so that devices which do not have a timer can still start an
1357 // enumeration.
1358 {
1359 ServiceRegistryRef ref{mServiceRegistry};
1360 ref.get<DriverClient>().flushPending(mServiceRegistry);
1361 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1362 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1363 // In such case we will take care of it at next iteration.
1364 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1365
1366 auto shouldNotWait = (lastActive != nullptr &&
1367 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1369 if (firstLoop) {
1370 shouldNotWait = true;
1371 firstLoop = false;
1372 }
1373 if (lastActive != nullptr) {
1375 }
1376 if (NewStatePending()) {
1377 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1378 shouldNotWait = true;
1380 }
1381 if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1382 state.transitionHandling = TransitionHandlingState::Requested;
1383 auto& deviceContext = ref.get<DeviceContext>();
1384 // Check if we only have timers
1385 auto& spec = ref.get<DeviceSpec const>();
1386 if (hasOnlyTimers(spec)) {
1388 }
1389
1390 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1391 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1392 uv_update_time(state.loop);
1393 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1394 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1395 }
1396 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1397 state.transitionHandling = TransitionHandlingState::Requested;
1398 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1399 uv_update_time(state.loop);
1400 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1401 deviceContext.exitTransitionTimeout);
1402 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1403 if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1404 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
1405 } else {
1406 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before switching to READY state.", (int)deviceContext.exitTransitionTimeout);
1407 }
1408 } else {
1409 state.transitionHandling = TransitionHandlingState::Expired;
1410 if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1411 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1412 } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1413 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1414 } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1415 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1416 } else {
1417 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1418 }
1419 }
1420 }
1421 // If we are Idle, we can then consider the transition to be expired.
1422 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1423 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1424 state.transitionHandling = TransitionHandlingState::Expired;
1425 }
1426 if (state.severityStack.empty() == false) {
1427 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1428 state.severityStack.pop_back();
1429 }
1430 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1431 // shouldNotWait |= info.readPolled;
1432 // }
1433 state.loopReason = DeviceState::NO_REASON;
1434 state.firedTimers.clear();
1435 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1436 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1437 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1438 }
1439 // Run the asynchronous queue just before sleeping again, so that:
1440 // - we can trigger further events from the queue
1441 // - we can guarantee this is the last thing we do in the loop (
1442 // assuming no one else is adding to the queue before this point).
1443 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1444 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1445 ServiceRegistryRef ref{registry};
1446 ref.get<AsyncQueue>();
1447 ref.get<DecongestionService>();
1448 ref.get<DataRelayer>();
1449 // Get the current timeslice for the slot.
1450 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1452 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1453 };
1454 auto& relayer = ref.get<DataRelayer>();
1455 relayer.prunePending(onDrop);
1456 auto& queue = ref.get<AsyncQueue>();
1457 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1458 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1459 if (shouldNotWait == false) {
1460 auto& dpContext = ref.get<DataProcessorContext>();
1461 dpContext.preLoopCallbacks(ref);
1462 }
1463 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1464 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1465 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1466 if ((state.loopReason & state.tracingFlags) != 0) {
1467 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1468 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1469 } else if (state.severityStack.empty() == false) {
1470 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1471 state.severityStack.pop_back();
1472 }
1473 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1474
1475 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1476 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1477 relayer.rescan();
1478 }
1479
1480 if (!state.pendingOffers.empty()) {
1481 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1482 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1483 }
1484 }
1485
1486 // Notify on the main thread the new region callbacks, making sure
1487 // no callback is issued if there is something still processing.
1488 // Notice that we still need to perform callbacks also after
1489 // the socket epolled, because otherwise we would end up serving
1490 // the callback after the first data arrives is the system is too
1491 // fast to transition from Init to Run.
1492 {
1493 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1494 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1495 }
1496
1497 assert(mStreams.size() == mHandles.size());
1499 TaskStreamRef streamRef{-1};
1500 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1501 auto& taskInfo = mStreams[ti];
1502 if (taskInfo.running) {
1503 continue;
1504 }
1505 // Stream 0 is for when we run in
1506 streamRef.index = ti;
1507 }
1508 using o2::monitoring::Metric;
1509 using o2::monitoring::Monitoring;
1510 using o2::monitoring::tags::Key;
1511 using o2::monitoring::tags::Value;
1512 // We have an empty stream, let's check if we have enough
1513 // resources for it to run something
1514 if (streamRef.index != -1) {
1515 // Synchronous execution of the callbacks. This will be moved in the
1516 // moved in the on_socket_polled once we have threading in place.
1517 uv_work_t& handle = mHandles[streamRef.index];
1518 TaskStreamInfo& stream = mStreams[streamRef.index];
1519 handle.data = &mStreams[streamRef.index];
1520
1521 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1522 ServiceRegistryRef ref{registry};
1523 auto& dpStats = ref.get<DataProcessingStats>();
1524 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1525 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1526 dpStats.processCommandQueue();
1527 };
1528 auto ref = ServiceRegistryRef{mServiceRegistry};
1529
1530 // Deciding wether to run or not can be done by passing a request to
1531 // the evaluator. In this case, the request is always satisfied and
1532 // we run on whatever resource is available.
1533 auto& spec = ref.get<DeviceSpec const>();
1534 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1535
1536 if (enough) {
1537 stream.id = streamRef;
1538 stream.running = true;
1539 stream.registry = &mServiceRegistry;
1540 if (dplEnableMultithreding) [[unlikely]] {
1541 stream.task = &handle;
1542 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1543 } else {
1544 run_callback(&handle);
1545 run_completion(&handle, 0);
1546 }
1547 } else {
1548 auto ref = ServiceRegistryRef{mServiceRegistry};
1549 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1550 }
1551 }
1552 }
1553
1554 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1555 auto& spec = ref.get<DeviceSpec const>();
1557 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1558 auto& info = state.inputChannelInfos[ci];
1559 info.parts.fParts.clear();
1560 }
1561 state.transitionHandling = TransitionHandlingState::NoTransition;
1562}
1563
1567{
1568 auto& context = ref.get<DataProcessorContext>();
1569 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1570 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1571
1572 {
1573 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1574 }
1575 // Whether or not we had something to do.
1576
1577 // Initialise the value for context.allDone. It will possibly be updated
1578 // below if any of the channels is not done.
1579 //
1580 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1581 // expect to receive an EndOfStream signal. Thus we do not wait for these
1582 // to be completed. In the case of data source devices, as they do not have
1583 // real data input channels, they have to signal EndOfStream themselves.
1584 auto& state = ref.get<DeviceState>();
1585 auto& spec = ref.get<DeviceSpec const>();
1586 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1587 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1588 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1589 if (info.channel) {
1590 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1591 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1592 } else {
1593 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1594 }
1595 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1596 });
1597 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1598 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1601 static std::vector<int> pollOrder;
1602 pollOrder.resize(state.inputChannelInfos.size());
1603 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1604 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1605 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1606 });
1607
1608 // Nothing to poll...
1609 if (pollOrder.empty()) {
1610 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1611 return;
1612 }
1613 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1614 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1615 auto delta = currentNewest.value - currentOldest.value;
1616 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1617 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1618 auto& infos = state.inputChannelInfos;
1619
1620 if (context.balancingInputs) {
1621 static int pipelineLength = DefaultsHelpers::pipelineLength();
1622 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1623 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1624 return infos[a].oldestForChannel.value > limitNew;
1625 });
1626 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1627 const auto& channelInfo = state.inputChannelInfos[*it];
1628 if (channelInfo.pollerIndex != -1) {
1629 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1630 auto& pollerContext = *(PollerContext*)(poller->data);
1631 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1632 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1633 bool shouldBeRunning = it < newEnd;
1634 if (running != shouldBeRunning) {
1635 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1636 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1637 }
1638 }
1639 }
1640 }
1641 pollOrder.erase(newEnd, pollOrder.end());
1642 }
1643 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1644
1645 for (auto sci : pollOrder) {
1646 auto& info = state.inputChannelInfos[sci];
1647 auto& channelSpec = spec.inputChannels[sci];
1648 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1649 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1650
1651 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1652 context.allDone = false;
1653 }
1654 if (info.state != InputChannelState::Running) {
1655 // Remember to flush data if we are not running
1656 // and there is some message pending.
1657 if (info.parts.Size()) {
1659 }
1660 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1661 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1662 continue;
1663 }
1664 if (info.channel == nullptr) {
1665 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1666 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1667 continue;
1668 }
1669 // Only poll DPL channels for now.
1671 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1672 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1673 continue;
1674 }
1675 auto& socket = info.channel->GetSocket();
1676 // If we have pending events from a previous iteration,
1677 // we do receive in any case.
1678 // Otherwise we check if there is any pending event and skip
1679 // this channel in case there is none.
1680 if (info.hasPendingEvents == 0) {
1681 socket.Events(&info.hasPendingEvents);
1682 // If we do not read, we can continue.
1683 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1684 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1685 continue;
1686 }
1687 }
1688 // We can reset this, because it means we have seen at least 1
1689 // message after the UV_READABLE was raised.
1690 info.readPolled = false;
1691 // Notice that there seems to be a difference between the documentation
1692 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1693 // is raised does not mean that a message is immediately available to
1694 // read, just that it will be available soon, so the receive can
1695 // still return -2. To avoid this we keep receiving on the socket until
1696 // we get a message. In order not to overflow the DPL queue we process
1697 // one message at the time and we keep track of wether there were more
1698 // to process.
1699 bool newMessages = false;
1700 while (true) {
1701 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1702 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1703 if (info.parts.Size() < 64) {
1704 fair::mq::Parts parts;
1705 info.channel->Receive(parts, 0);
1706 if (parts.Size()) {
1707 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1708 }
1709 for (auto&& part : parts) {
1710 info.parts.fParts.emplace_back(std::move(part));
1711 }
1712 newMessages |= true;
1713 }
1714
1715 if (info.parts.Size() >= 0) {
1717 // Receiving data counts as activity now, so that
1718 // We can make sure we process all the pending
1719 // messages without hanging on the uv_run.
1720 break;
1721 }
1722 }
1723 // We check once again for pending events, keeping track if this was the
1724 // case so that we can immediately repeat this loop and avoid remaining
1725 // stuck in uv_run. This is because we will not get notified on the socket
1726 // if more events are pending due to zeromq level triggered approach.
1727 socket.Events(&info.hasPendingEvents);
1728 if (info.hasPendingEvents) {
1729 info.readPolled = false;
1730 // In case there were messages, we consider it as activity
1731 if (newMessages) {
1732 state.lastActiveDataProcessor.store(&context);
1733 }
1734 }
1735 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1736 channelSpec.name.c_str(), info.id.value);
1737 }
1738}
1739
1741{
1742 auto& context = ref.get<DataProcessorContext>();
1743 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1744 auto& state = ref.get<DeviceState>();
1745 auto& spec = ref.get<DeviceSpec const>();
1746
1747 if (state.streaming == StreamingState::Idle) {
1748 return;
1749 }
1750
1751 context.completed.clear();
1752 context.completed.reserve(16);
1753 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1754 state.lastActiveDataProcessor.store(&context);
1755 }
1756 DanglingContext danglingContext{*context.registry};
1757
1758 context.preDanglingCallbacks(danglingContext);
1759 if (state.lastActiveDataProcessor.load() == nullptr) {
1760 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1761 }
1762 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1763 if (activity.expiredSlots > 0) {
1764 state.lastActiveDataProcessor = &context;
1765 }
1766
1767 context.completed.clear();
1768 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1769 state.lastActiveDataProcessor = &context;
1770 }
1771
1772 context.postDanglingCallbacks(danglingContext);
1773
1774 // If we got notified that all the sources are done, we call the EndOfStream
1775 // callback and return false. Notice that what happens next is actually
1776 // dependent on the callback, not something which is controlled by the
1777 // framework itself.
1778 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1780 state.lastActiveDataProcessor = &context;
1781 }
1782
1783 if (state.streaming == StreamingState::EndOfStreaming) {
1784 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1785 // We keep processing data until we are Idle.
1786 // FIXME: not sure this is the correct way to drain the queues, but
1787 // I guess we will see.
1790 auto& relayer = ref.get<DataRelayer>();
1791
1792 bool shouldProcess = hasOnlyGenerated(spec) == false;
1793
1794 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1795 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1796 }
1797
1798 auto& timingInfo = ref.get<TimingInfo>();
1799 // We should keep the data generated at end of stream only for those
1800 // which are not sources.
1801 timingInfo.keepAtEndOfStream = shouldProcess;
1802 // Fill timinginfo with some reasonable values for data sent with endOfStream
1803 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1804 timingInfo.tfCounter = -1;
1805 timingInfo.firstTForbit = -1;
1806 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1807 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1808 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1809
1810 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1811
1812 context.preEOSCallbacks(eosContext);
1813 auto& streamContext = ref.get<StreamContext>();
1814 streamContext.preEOSCallbacks(eosContext);
1815 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1816 streamContext.postEOSCallbacks(eosContext);
1817 context.postEOSCallbacks(eosContext);
1818
1819 for (auto& channel : spec.outputChannels) {
1820 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1822 }
1823 // This is needed because the transport is deleted before the device.
1824 relayer.clear();
1826 // In case we should process, note the data processor responsible for it
1827 if (shouldProcess) {
1828 state.lastActiveDataProcessor = &context;
1829 }
1830 // On end of stream we shut down all output pollers.
1831 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1832 for (auto& poller : state.activeOutputPollers) {
1833 uv_poll_stop(poller);
1834 }
1835 return;
1836 }
1837
1838 if (state.streaming == StreamingState::Idle) {
1839 // On end of stream we shut down all output pollers.
1840 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1841 for (auto& poller : state.activeOutputPollers) {
1842 uv_poll_stop(poller);
1843 }
1844 }
1845
1846 return;
1847}
1848
1850{
1851 ServiceRegistryRef ref{mServiceRegistry};
1852 ref.get<DataRelayer>().clear();
1853 auto& deviceContext = ref.get<DeviceContext>();
1854 // If the signal handler is there, we should
1855 // hide the registry from it, so that we do not
1856 // end up calling the signal handler on something
1857 // which might not be there anymore.
1858 if (deviceContext.sigusr1Handle) {
1859 deviceContext.sigusr1Handle->data = nullptr;
1860 }
1861 // Makes sure we do not have a working context on
1862 // shutdown.
1863 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1864 handle->data = nullptr;
1865 }
1866}
1867
1870 {
1871 }
1872};
1873
1879{
1880 using InputInfo = DataRelayer::InputInfo;
1882
1883 auto& context = ref.get<DataProcessorContext>();
1884 // This is the same id as the upper level function, so we get the events
1885 // associated with the same interval. We will simply use "handle_data" as
1886 // the category.
1887 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1888
1889 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1890 // and we do a few stats. We bind parts as a lambda captured variable, rather
1891 // than an input, because we do not want the outer loop actually be exposed
1892 // to the implementation details of the messaging layer.
1893 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1894 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1895 auto ref = ServiceRegistryRef{*context.registry};
1896 auto& stats = ref.get<DataProcessingStats>();
1897 auto& state = ref.get<DeviceState>();
1898 auto& parts = info.parts;
1899 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1900
1901 std::vector<InputInfo> results;
1902 // we can reserve the upper limit
1903 results.reserve(parts.Size() / 2);
1904 size_t nTotalPayloads = 0;
1905
1906 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1907 results.emplace_back(position, length, type, index);
1908 if (type != InputType::Invalid && length > 1) {
1909 nTotalPayloads += length - 1;
1910 }
1911 };
1912
1913 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1914 auto* headerData = parts.At(pi)->GetData();
1915 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1916 if (sih) {
1917 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1918 info.state = sih->state;
1919 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1920 state.lastActiveDataProcessor = &context;
1921 continue;
1922 }
1923 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1924 if (dih) {
1925 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1926 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1927 state.lastActiveDataProcessor = &context;
1928 continue;
1929 }
1930 auto dh = o2::header::get<DataHeader*>(headerData);
1931 if (!dh) {
1932 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1933 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1934 continue;
1935 }
1936 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1937 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1938 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1939 continue;
1940 }
1941 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1942 // We only deal with the tracking of parts if the log is enabled.
1943 // This is because in principle we should track the size of each of
1944 // the parts and sum it up. Not for now.
1945 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1946 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1947 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1948 if (!dph) {
1949 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1950 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1951 continue;
1952 }
1953 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1954 // this is indicating a sequence of payloads following the header
1955 // FIXME: we will probably also set the DataHeader version
1956 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1957 pi += dh->splitPayloadParts - 1;
1958 } else {
1959 // We can set the type for the next splitPayloadParts
1960 // because we are guaranteed they are all the same.
1961 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1962 // pair.
1963 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1964 if (finalSplitPayloadIndex > parts.Size()) {
1965 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1966 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1967 continue;
1968 }
1969 insertInputInfo(pi, 2, InputType::Data, info.id);
1970 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1971 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1972 }
1973 }
1974 }
1975 if (results.size() + nTotalPayloads != parts.Size()) {
1976 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1977 return std::nullopt;
1978 }
1979 return results;
1980 };
1981
1982 auto reportError = [ref](const char* message) {
1983 auto& stats = ref.get<DataProcessingStats>();
1985 };
1986
1987 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1988 auto& relayer = ref.get<DataRelayer>();
1989 auto& state = ref.get<DeviceState>();
1990 static WaitBackpressurePolicy policy;
1991 auto& parts = info.parts;
1992 // We relay execution to make sure we have a complete set of parts
1993 // available.
1994 bool hasBackpressure = false;
1995 size_t minBackpressureTimeslice = -1;
1996 bool hasData = false;
1997 size_t oldestPossibleTimeslice = -1;
1998 static std::vector<int> ordering;
1999 // Same as inputInfos but with iota.
2000 ordering.resize(inputInfos.size());
2001 std::iota(ordering.begin(), ordering.end(), 0);
2002 // stable sort orderings by type and position
2003 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
2004 auto const& ai = inputInfos[a];
2005 auto const& bi = inputInfos[b];
2006 if (ai.type != bi.type) {
2007 return ai.type < bi.type;
2008 }
2009 return ai.position < bi.position;
2010 });
2011 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
2012 auto const& input = inputInfos[ordering[ii]];
2013 switch (input.type) {
2014 case InputType::Data: {
2015 hasData = true;
2016 auto headerIndex = input.position;
2017 auto nMessages = 0;
2018 auto nPayloadsPerHeader = 0;
2019 if (input.size > 2) {
2020 // header and multiple payload sequence
2021 nMessages = input.size;
2022 nPayloadsPerHeader = nMessages - 1;
2023 } else {
2024 // multiple header-payload pairs
2025 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2026 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2027 nPayloadsPerHeader = 1;
2028 ii += (nMessages / 2) - 1;
2029 }
2030 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
2031 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
2032 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2033 slot.index, oldestOutputInfo.timeslice.value);
2034 ref.get<AsyncQueue>();
2035 ref.get<DecongestionService>();
2036 ref.get<DataRelayer>();
2037 // Get the current timeslice for the slot.
2038 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
2040 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
2041 };
2042 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2043 &parts.At(headerIndex),
2044 input,
2045 nMessages,
2046 nPayloadsPerHeader,
2047 onDrop);
2048 switch (relayed.type) {
2050 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
2051 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
2052 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2053 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2054 info.backpressureNotified = true;
2055 info.normalOpsNotified = false;
2056 }
2057 policy.backpressure(info);
2058 hasBackpressure = true;
2059 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2060 break;
2064 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2065 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2066 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2067 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2068 info.normalOpsNotified = true;
2069 info.backpressureNotified = false;
2070 }
2071 break;
2072 }
2073 } break;
2074 case InputType::SourceInfo: {
2075 LOGP(detail, "Received SourceInfo");
2076 auto& context = ref.get<DataProcessorContext>();
2077 state.lastActiveDataProcessor = &context;
2078 auto headerIndex = input.position;
2079 auto payloadIndex = input.position + 1;
2080 assert(payloadIndex < parts.Size());
2081 // FIXME: the message with the end of stream cannot contain
2082 // split parts.
2083 parts.At(headerIndex).reset(nullptr);
2084 parts.At(payloadIndex).reset(nullptr);
2085 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2086 // parts.At(headerIndex + 1 + i).reset(nullptr);
2087 // }
2088 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2089
2090 } break;
2091 case InputType::DomainInfo: {
2094 auto& context = ref.get<DataProcessorContext>();
2095 state.lastActiveDataProcessor = &context;
2096 auto headerIndex = input.position;
2097 auto payloadIndex = input.position + 1;
2098 assert(payloadIndex < parts.Size());
2099 // FIXME: the message with the end of stream cannot contain
2100 // split parts.
2101
2102 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2103 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2104 break;
2105 }
2106 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2107 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2108 parts.At(headerIndex).reset(nullptr);
2109 parts.At(payloadIndex).reset(nullptr);
2110 }
2111 case InputType::Invalid: {
2112 reportError("Invalid part found.");
2113 } break;
2114 }
2115 }
2118 if (oldestPossibleTimeslice != (size_t)-1) {
2119 info.oldestForChannel = {oldestPossibleTimeslice};
2120 auto& context = ref.get<DataProcessorContext>();
2121 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2122 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2123 state.lastActiveDataProcessor = &context;
2124 }
2125 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2126 parts.fParts.erase(it, parts.end());
2127 if (parts.fParts.size()) {
2128 LOG(debug) << parts.fParts.size() << " messages backpressured";
2129 }
2130 };
2131
2132 // Second part. This is the actual outer loop we want to obtain, with
2133 // implementation details which can be read. Notice how most of the state
2134 // is actually hidden. For example we do not expose what "input" is. This
2135 // will allow us to keep the same toplevel logic even if the actual meaning
2136 // of input is changed (for example we might move away from multipart
2137 // messages). Notice also that we need to act diffently depending on the
2138 // actual CompletionOp we want to perform. In particular forwarding inputs
2139 // also gets rid of them from the cache.
2140 auto inputTypes = getInputTypes();
2141 if (bool(inputTypes) == false) {
2142 reportError("Parts should come in couples. Dropping it.");
2143 return;
2144 }
2145 handleValidMessages(*inputTypes);
2146 return;
2147}
2148
2149namespace
2150{
2151struct InputLatency {
2152 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2153 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2154};
2155
2156auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2157{
2158 InputLatency result;
2159
2160 for (auto& item : record) {
2161 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2162 if (header == nullptr) {
2163 continue;
2164 }
2165 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2166 if (partLatency < 0) {
2167 partLatency = 0;
2168 }
2169 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2170 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2171 }
2172 return result;
2173};
2174
2175auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2176{
2177 size_t totalInputSize = 0;
2178 for (auto& item : record) {
2179 auto* header = o2::header::get<DataHeader*>(item.header);
2180 if (header == nullptr) {
2181 continue;
2182 }
2183 totalInputSize += header->payloadSize;
2184 }
2185 return totalInputSize;
2186};
2187
2188template <typename T>
2189void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2190{
2191 T prev_value = maximum_value;
2192 while (prev_value < value &&
2193 !maximum_value.compare_exchange_weak(prev_value, value)) {
2194 }
2195}
2196} // namespace
2197
2198bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2199{
2200 auto& context = ref.get<DataProcessorContext>();
2201 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2202 // This is the actual hidden state for the outer loop. In case we decide we
2203 // want to support multithreaded dispatching of operations, I can simply
2204 // move these to some thread local store and the rest of the lambdas
2205 // should work just fine.
2206 std::vector<MessageSet> currentSetOfInputs;
2207
2208 //
2209 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2210 auto& relayer = ref.get<DataRelayer>();
2211 if (consume) {
2212 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2213 } else {
2214 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2215 }
2216 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2217 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2218 const char* headerptr = nullptr;
2219 const char* payloadptr = nullptr;
2220 size_t payloadSize = 0;
2221 // - each input can have multiple parts
2222 // - "part" denotes a sequence of messages belonging together, the first message of the
2223 // sequence is the header message
2224 // - each part has one or more payload messages
2225 // - InputRecord provides all payloads as header-payload pair
2226 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2227 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2228 headerptr = static_cast<char const*>(headerMsg->GetData());
2229 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2230 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2231 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2232 }
2233 return DataRef{};
2234 };
2235 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2236 return currentSetOfInputs[i].getNumberOfPairs();
2237 };
2238#if __has_include(<fairmq/shmem/Message.h>)
2239 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2240 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2241 return header.GetRefCount();
2242 };
2243#else
2244 std::function<int(size_t)> refCountGetter = nullptr;
2245#endif
2246 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2247 };
2248
2249 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2250 auto& relayer = ref.get<DataRelayer>();
2252 };
2253
2254 // I need a preparation step which gets the current timeslice id and
2255 // propagates it to the various contextes (i.e. the actual entities which
2256 // create messages) because the messages need to have the timeslice id into
2257 // it.
2258 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2259 auto& relayer = ref.get<DataRelayer>();
2260 auto& timingInfo = ref.get<TimingInfo>();
2261 auto timeslice = relayer.getTimesliceForSlot(i);
2262
2263 timingInfo.timeslice = timeslice.value;
2264 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2265 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2266 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2267 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2268 };
2269 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2270 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2271 auto& relayer = ref.get<DataRelayer>();
2272 auto& timingInfo = ref.get<TimingInfo>();
2273 auto timeslice = relayer.getTimesliceForSlot(i);
2274 // We report wether or not this timing info refers to a new Run.
2275 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2276 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2277 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2278 // FIXME: for now there is only one stream, however we
2279 // should calculate this correctly once we finally get the
2280 // the StreamContext in.
2281 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2282 };
2283
2284 // When processing them, timers will have to be cleaned up
2285 // to avoid double counting them.
2286 // This was actually the easiest solution we could find for
2287 // O2-646.
2288 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2289 assert(record.size() == currentSetOfInputs.size());
2290 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2291 // assuming that for timer inputs we do have exactly one PartRef object
2292 // in the MessageSet, multiple PartRef Objects are only possible for either
2293 // split payload messages of wildcard matchers, both for data inputs
2294 DataRef input = record.getByPos(ii);
2295 if (input.spec->lifetime != Lifetime::Timer) {
2296 continue;
2297 }
2298 if (input.header == nullptr) {
2299 continue;
2300 }
2301 // This will hopefully delete the message.
2302 currentSetOfInputs[ii].clear();
2303 }
2304 };
2305
2306 // Function to cleanup record. For the moment we
2307 // simply use it to keep track of input messages
2308 // which are not needed, to display them in the GUI.
2309 auto cleanupRecord = [](InputRecord& record) {
2310 if (O2_LOG_ENABLED(parts) == false) {
2311 return;
2312 }
2313 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2314 DataRef input = record.getByPos(pi);
2315 if (input.header == nullptr) {
2316 continue;
2317 }
2318 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2319 if (sih) {
2320 continue;
2321 }
2322
2323 auto dh = o2::header::get<DataHeader*>(input.header);
2324 if (!dh) {
2325 continue;
2326 }
2327 // We use the address of the first header of a split payload
2328 // to identify the interval.
2329 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2330 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2331
2332 // No split parts, we simply skip the payload
2333 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2334 // this is indicating a sequence of payloads following the header
2335 // FIXME: we will probably also set the DataHeader version
2336 pi += dh->splitPayloadParts - 1;
2337 } else {
2338 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2339 }
2340 }
2341 };
2342
2343 ref.get<DataRelayer>().getReadyToProcess(completed);
2344 if (completed.empty() == true) {
2345 LOGP(debug, "No computations available for dispatching.");
2346 return false;
2347 }
2348
2349 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2350 auto& stats = ref.get<DataProcessingStats>();
2351 auto& states = ref.get<DataProcessingStates>();
2352 std::atomic_thread_fence(std::memory_order_release);
2353 char relayerSlotState[1024];
2354 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2355 char* buffer = relayerSlotState + written;
2356 for (size_t ai = 0; ai != record.size(); ai++) {
2357 buffer[ai] = record.isValid(ai) ? '3' : '0';
2358 }
2359 buffer[record.size()] = 0;
2360 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2361 .size = (int)(record.size() + buffer - relayerSlotState),
2362 .data = relayerSlotState});
2363 uint64_t tEnd = uv_hrtime();
2364 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2365 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2367 // Sum up the total wall time, in milliseconds.
2369 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2371 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2372 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2373 auto latency = calculateInputRecordLatency(record, tStartMilli);
2374 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2375 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2376 static int count = 0;
2378 count++;
2379 };
2380
2381 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2382 auto& states = ref.get<DataProcessingStates>();
2383 std::atomic_thread_fence(std::memory_order_release);
2384 char relayerSlotState[1024];
2385 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2386 char* buffer = strchr(relayerSlotState, ' ') + 1;
2387 for (size_t ai = 0; ai != record.size(); ai++) {
2388 buffer[ai] = record.isValid(ai) ? '2' : '0';
2389 }
2390 buffer[record.size()] = 0;
2391 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2392 };
2393
2394 // This is the main dispatching loop
2395 auto& state = ref.get<DeviceState>();
2396 auto& spec = ref.get<DeviceSpec const>();
2397
2398 auto& dpContext = ref.get<DataProcessorContext>();
2399 auto& streamContext = ref.get<StreamContext>();
2400 O2_SIGNPOST_ID_GENERATE(sid, device);
2401 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2402
2403 auto& stats = ref.get<DataProcessingStats>();
2404 auto& relayer = ref.get<DataRelayer>();
2405 using namespace o2::framework;
2406 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2407 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2408 switch (spec.completionPolicy.order) {
2410 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2411 break;
2413 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2414 break;
2416 default:
2417 break;
2418 }
2419
2420 for (auto action : completed) {
2421 O2_SIGNPOST_ID_GENERATE(aid, device);
2422 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2424 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2425 continue;
2426 }
2427
2428 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2430 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2434 updateRunInformation(TimesliceSlot{action.slot});
2435 }
2436 InputSpan span = getInputSpan(action.slot, shouldConsume);
2437 auto& spec = ref.get<DeviceSpec const>();
2438 InputRecord record{spec.inputs,
2439 span,
2440 *context.registry};
2441 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2442 {
2443 // Notice this should be thread safe and reentrant
2444 // as it is called from many threads.
2445 streamContext.preProcessingCallbacks(processContext);
2446 dpContext.preProcessingCallbacks(processContext);
2447 }
2449 context.postDispatchingCallbacks(processContext);
2450 if (spec.forwards.empty() == false) {
2451 auto& timesliceIndex = ref.get<TimesliceIndex>();
2452 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2453 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2454 continue;
2455 }
2456 }
2457 // If there is no optional inputs we canForwardEarly
2458 // the messages to that parallel processing can happen.
2459 // In this case we pass true to indicate that we want to
2460 // copy the messages to the subsequent data processor.
2461 bool hasForwards = spec.forwards.empty() == false;
2463
2464 if (context.canForwardEarly && hasForwards && consumeSomething) {
2465 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2466 auto& timesliceIndex = ref.get<TimesliceIndex>();
2467 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2468 }
2469 markInputsAsDone(action.slot);
2470
2471 uint64_t tStart = uv_hrtime();
2472 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2473 preUpdateStats(action, record, tStart);
2474
2475 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2476
2477 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2478 auto& state = ref.get<DeviceState>();
2479 auto& spec = ref.get<DeviceSpec const>();
2480 auto& streamContext = ref.get<StreamContext>();
2481 auto& dpContext = ref.get<DataProcessorContext>();
2482 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2483 switch (action.op) {
2488 return true;
2489 break;
2490 default:
2491 return false;
2492 }
2493 };
2494 if (state.quitRequested == false) {
2495 {
2496 // Callbacks from services
2497 dpContext.preProcessingCallbacks(processContext);
2498 streamContext.preProcessingCallbacks(processContext);
2499 dpContext.preProcessingCallbacks(processContext);
2500 // Callbacks from users
2501 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2502 }
2503 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2504 if (context.statefulProcess && shouldProcess(action)) {
2505 // This way, usercode can use the the same processing context to identify
2506 // its signposts and we can map user code to device iterations.
2507 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2508 (context.statefulProcess)(processContext);
2509 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2510 } else if (context.statelessProcess && shouldProcess(action)) {
2511 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2512 (context.statelessProcess)(processContext);
2513 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2514 } else if (context.statelessProcess || context.statefulProcess) {
2515 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2516 } else {
2517 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2519 }
2520 if (shouldProcess(action)) {
2521 auto& timingInfo = ref.get<TimingInfo>();
2522 if (timingInfo.globalRunNumberChanged) {
2523 context.lastRunNumberProcessed = timingInfo.runNumber;
2524 }
2525 }
2526
2527 // Notify the sink we just consumed some timeframe data
2528 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2529 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2530 auto& allocator = ref.get<DataAllocator>();
2531 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2532 }
2533
2534 // Extra callback which allows a service to add extra outputs.
2535 // This is needed e.g. to ensure that injected CCDB outputs are added
2536 // before an end of stream.
2537 {
2538 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2539 dpContext.finaliseOutputsCallbacks(processContext);
2540 streamContext.finaliseOutputsCallbacks(processContext);
2541 }
2542
2543 {
2544 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2545 dpContext.postProcessingCallbacks(processContext);
2546 streamContext.postProcessingCallbacks(processContext);
2547 }
2548 }
2549 };
2550
2551 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2552 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2553 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2554 }
2555 if (noCatch) {
2556 try {
2557 runNoCatch(action);
2558 } catch (o2::framework::RuntimeErrorRef e) {
2559 (context.errorHandling)(e, record);
2560 }
2561 } else {
2562 try {
2563 runNoCatch(action);
2564 } catch (std::exception& ex) {
2568 auto e = runtime_error(ex.what());
2569 (context.errorHandling)(e, record);
2570 } catch (o2::framework::RuntimeErrorRef e) {
2571 (context.errorHandling)(e, record);
2572 }
2573 }
2574 if (state.severityStack.empty() == false) {
2575 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2576 state.severityStack.pop_back();
2577 }
2578
2579 postUpdateStats(action, record, tStart, tStartMilli);
2580 // We forward inputs only when we consume them. If we simply Process them,
2581 // we keep them for next message arriving.
2583 cleanupRecord(record);
2584 context.postDispatchingCallbacks(processContext);
2585 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2586 }
2587 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2588 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2589 auto& timesliceIndex = ref.get<TimesliceIndex>();
2590 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2591 }
2592 context.postForwardingCallbacks(processContext);
2594 cleanTimers(action.slot, record);
2595 }
2596 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2597 }
2598 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2599
2600 // We now broadcast the end of stream if it was requested
2601 if (state.streaming == StreamingState::EndOfStreaming) {
2602 LOGP(detail, "Broadcasting end of stream");
2603 for (auto& channel : spec.outputChannels) {
2605 }
2607 }
2608
2609 return true;
2610}
2611
2613{
2614 LOG(error) << msg;
2615 ServiceRegistryRef ref{mServiceRegistry};
2616 auto& stats = ref.get<DataProcessingStats>();
2618}
2619
2620std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2621{
2622
2623 if (registry.active<ConfigurationInterface>()) {
2624 auto& cfg = registry.get<ConfigurationInterface>();
2625 try {
2626 cfg.getRecursive(name);
2627 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2628 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2629 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2630 configStore->preload();
2631 configStore->activate();
2632 return configStore;
2633 } catch (...) {
2634 // No overrides...
2635 }
2636 }
2637 return {nullptr};
2638}
2639
2640} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:515
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:536
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &, ProcessingPolicies &policies)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
auto switchState(ServiceRegistryRef &ref, StreamingState newState) -> void
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:618
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153