Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
31#include "Framework/InputSpan.h"
32#if defined(__APPLE__) || defined(NDEBUG)
33#define O2_SIGNPOST_IMPLEMENTATION
34#endif
35#include "Framework/Signpost.h"
48
49#include "DecongestionService.h"
51#include "DataRelayerHelpers.h"
52#include "Headers/DataHeader.h"
54
55#include <Framework/Tracing.h>
56
57#include <fairmq/Parts.h>
58#include <fairmq/Socket.h>
59#include <fairmq/ProgOptions.h>
60#if __has_include(<fairmq/shmem/Message.h>)
61#include <fairmq/shmem/Message.h>
62#endif
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// A log to use for general device logging
91// Special log to keep track of the lifetime of the parts
93// Stream which keeps track of the calibration lifetime logic
95// Special log to track the async queue behavior
97// Special log to track the forwarding requests
99
100using namespace o2::framework;
101using ConfigurationInterface = o2::configuration::ConfigurationInterface;
103
104constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
105
106namespace o2::framework
107{
108
109template <>
113
117{
118 auto* state = (DeviceState*)handle->data;
119 state->loopReason |= DeviceState::TIMER_EXPIRED;
120}
121
122bool hasOnlyTimers(DeviceSpec const& spec)
123{
124 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
125}
126
128{
129 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
130}
131
133{
134 auto* ref = (ServiceRegistryRef*)handle->data;
135 auto& state = ref->get<DeviceState>();
136 state.loopReason |= DeviceState::TIMER_EXPIRED;
137 // Check if this is a source device
138 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
139 auto& spec = ref->get<DeviceSpec const>();
140 if (hasOnlyGenerated(spec)) {
141 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
142 } else {
143 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.",
144 state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration");
145 }
146 state.transitionHandling = TransitionHandlingState::Expired;
147}
148
150{
151 auto& state = ref.get<DeviceState>();
152 auto& context = ref.get<DataProcessorContext>();
153 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
154 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
155 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
156 state.streaming = newState;
157 ref.get<ControlService>().notifyStreamingState(state.streaming);
158};
159
161{
162 auto* ref = (ServiceRegistryRef*)handle->data;
163 auto& state = ref->get<DeviceState>();
164 auto& spec = ref->get<DeviceSpec const>();
165 state.loopReason |= DeviceState::TIMER_EXPIRED;
166
167 // Check if this is a source device
168 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
169
170 if (hasOnlyGenerated(spec)) {
171 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
173 } else {
174 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
175 state.allowedProcessing = DeviceState::CalibrationOnly;
176 }
177}
178
180{
181 auto* state = (DeviceState*)s->data;
183}
184
185DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
186{
187 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
188 return devices[running.index];
189}
190
196
198 : mRunningDevice{running},
199 mConfigRegistry{nullptr},
200 mServiceRegistry{registry},
201 mProcessingPolicies{policies}
202{
203 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
204 if (key == "cleanup") {
206 auto& deviceState = ref.get<DeviceState>();
207 int64_t cleanupCount = deviceState.cleanupCount.load();
208 int64_t newCleanupCount = std::stoll(value);
209 if (newCleanupCount <= cleanupCount) {
210 return;
211 }
212 deviceState.cleanupCount.store(newCleanupCount);
213 for (auto& info : deviceState.inputChannelInfos) {
214 fair::mq::Parts parts;
215 while (info.channel->Receive(parts, 0)) {
216 LOGP(debug, "Dropping {} parts", parts.Size());
217 if (parts.Size() == 0) {
218 break;
219 }
220 }
221 }
222 }
223 });
224
225 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
227 auto& deviceState = ref.get<DeviceState>();
228 auto& control = ref.get<ControlService>();
229 auto& callbacks = ref.get<CallbackService>();
230 control.notifyDeviceState(fair::mq::GetStateName(state));
232
233 if (deviceState.nextFairMQState.empty() == false) {
234 auto state = deviceState.nextFairMQState.back();
235 (void)this->ChangeState(state);
236 deviceState.nextFairMQState.pop_back();
237 }
238 };
239
240 // 99 is to execute DPL callbacks last
241 this->SubscribeToStateChange("99-dpl", stateWatcher);
242
243 // One task for now.
244 mStreams.resize(1);
245 mHandles.resize(1);
246
247 ServiceRegistryRef ref{mServiceRegistry};
248 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
249 auto& state = ref.get<DeviceState>();
250 assert(state.loop);
251 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
252 mAwakeHandle->data = &state;
253 if (res < 0) {
254 LOG(error) << "Unable to initialise subscription";
255 }
256
258 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
259 int res = uv_async_send(wakeHandle);
260 if (res < 0) {
261 LOG(error) << "Unable to notify subscription";
262 }
263 LOG(debug) << "State transition requested";
264 });
265}
266
267// Callback to execute the processing. Notice how the data is
268// is a vector of DataProcessorContext so that we can index the correct
269// one with the thread id. For the moment we simply use the first one.
270void run_callback(uv_work_t* handle)
271{
272 auto* task = (TaskStreamInfo*)handle->data;
273 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
274 // We create a new signpost interval for this specific data processor. Same id, same data processor.
275 auto& dataProcessorContext = ref.get<DataProcessorContext>();
276 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
277 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
280 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
281}
282
283// Once the processing in a thread is done, this is executed on the main thread.
284void run_completion(uv_work_t* handle, int status)
285{
286 auto* task = (TaskStreamInfo*)handle->data;
287 // Notice that the completion, while running on the main thread, still
288 // has a salt which is associated to the actual stream which was doing the computation
289 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
290 auto& state = ref.get<DeviceState>();
291 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
292
293 using o2::monitoring::Metric;
294 using o2::monitoring::Monitoring;
295 using o2::monitoring::tags::Key;
296 using o2::monitoring::tags::Value;
297
298 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
299 auto& dpStats = ref.get<DataProcessingStats>();
300 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
301
302 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
303 dpStats.processCommandQueue();
304 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
305 };
306
307 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
308 auto& dpStats = ref.get<DataProcessingStats>();
309 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
310 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
311 dpStats.processCommandQueue();
312 };
313
314 for (auto& consumer : state.offerConsumers) {
315 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
316 }
317 state.offerConsumers.clear();
318 quotaEvaluator.handleExpired(reportExpiredOffer);
319 quotaEvaluator.dispose(task->id.index);
320 task->running = false;
321}
322
323// Context for polling
325 enum struct PollerState : char { Stopped,
327 Connected,
328 Suspended };
329 char const* name = nullptr;
330 uv_loop_t* loop = nullptr;
332 DeviceState* state = nullptr;
333 fair::mq::Socket* socket = nullptr;
335 int fd = -1;
336 bool read = true;
338};
339
340void on_socket_polled(uv_poll_t* poller, int status, int events)
341{
342 auto* context = (PollerContext*)poller->data;
343 assert(context);
344 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
345 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
346 switch (events) {
347 case UV_READABLE: {
348 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
349 context->state->loopReason |= DeviceState::DATA_INCOMING;
350 } break;
351 case UV_WRITABLE: {
352 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
353 if (context->read) {
354 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
355 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
356 context->state->loopReason |= DeviceState::DATA_CONNECTED;
357 } else {
358 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
359 context->state->loopReason |= DeviceState::DATA_OUTGOING;
360 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
361 // just wait for the disconnect.
362 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
363 }
364 context->pollerState = PollerContext::PollerState::Connected;
365 } break;
366 case UV_DISCONNECT: {
367 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
368 } break;
369 case UV_PRIORITIZED: {
370 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
371 } break;
372 }
373 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
374}
375
376void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
377{
378 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
379 auto* context = (PollerContext*)poller->data;
380 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
381 if (status < 0) {
382 LOGP(fatal, "Error while polling {}: {}", context->name, status);
383 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
384 }
385 switch (events) {
386 case UV_READABLE: {
387 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
388 context->state->loopReason |= DeviceState::DATA_INCOMING;
389 assert(context->channelInfo);
390 context->channelInfo->readPolled = true;
391 } break;
392 case UV_WRITABLE: {
393 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
394 if (context->read) {
395 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
396 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
397 } else {
398 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
399 context->state->loopReason |= DeviceState::DATA_OUTGOING;
400 }
401 } break;
402 case UV_DISCONNECT: {
403 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
404 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
405 } break;
406 case UV_PRIORITIZED: {
407 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
408 } break;
409 }
410 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
411}
412
421{
422 auto ref = ServiceRegistryRef{mServiceRegistry};
423 auto& context = ref.get<DataProcessorContext>();
424 auto& spec = getRunningDevice(mRunningDevice, ref);
425
426 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
427 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
428 context.statelessProcess = spec.algorithm.onProcess;
429 context.statefulProcess = nullptr;
430 context.error = spec.algorithm.onError;
431 context.initError = spec.algorithm.onInitError;
432
433 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
434 if (configStore == nullptr) {
435 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
436 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
437 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
438 configStore->preload();
439 configStore->activate();
440 }
441
442 using boost::property_tree::ptree;
443
445 for (auto& entry : configStore->store()) {
446 std::stringstream ss;
447 std::string str;
448 if (entry.second.empty() == false) {
449 boost::property_tree::json_parser::write_json(ss, entry.second, false);
450 str = ss.str();
451 str.pop_back(); // remove EoL
452 } else {
453 str = entry.second.get_value<std::string>();
454 }
455 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
456 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
457 }
458
459 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
460
461 // Setup the error handlers for init
462 if (context.initError) {
463 context.initErrorHandling = [&errorCallback = context.initError,
464 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
468 auto& context = ref.get<DataProcessorContext>();
469 auto& err = error_from_ref(e);
470 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
471 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
472 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
473 auto& stats = ref.get<DataProcessingStats>();
475 InitErrorContext errorContext{ref, e};
476 errorCallback(errorContext);
477 };
478 } else {
479 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
480 auto& err = error_from_ref(e);
484 auto& context = ref.get<DataProcessorContext>();
485 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
486 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
487 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
488 auto& stats = ref.get<DataProcessingStats>();
490 exit(1);
491 };
492 }
493
494 context.expirationHandlers.clear();
495 context.init = spec.algorithm.onInit;
496 if (context.init) {
497 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
498 InitContext initContext{*mConfigRegistry, mServiceRegistry};
499
500 if (noCatch) {
501 try {
502 context.statefulProcess = context.init(initContext);
504 if (context.initErrorHandling) {
505 (context.initErrorHandling)(e);
506 }
507 }
508 } else {
509 try {
510 context.statefulProcess = context.init(initContext);
511 } catch (std::exception& ex) {
515 auto e = runtime_error(ex.what());
516 (context.initErrorHandling)(e);
518 (context.initErrorHandling)(e);
519 }
520 }
521 }
522 auto& state = ref.get<DeviceState>();
523 state.inputChannelInfos.resize(spec.inputChannels.size());
527 int validChannelId = 0;
528 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
529 auto& name = spec.inputChannels[ci].name;
530 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
531 state.inputChannelInfos[ci].state = InputChannelState::Pull;
532 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
533 validChannelId++;
534 } else {
535 state.inputChannelInfos[ci].id = {validChannelId++};
536 }
537 }
538
539 // Invoke the callback policy for this device.
540 if (spec.callbacksPolicy.policy != nullptr) {
541 InitContext initContext{*mConfigRegistry, mServiceRegistry};
542 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
543 }
544
545 // Services which are stream should be initialised now
546 auto* options = GetConfig();
547 for (size_t si = 0; si < mStreams.size(); ++si) {
549 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
550 }
551 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
552}
553
554void on_signal_callback(uv_signal_t* handle, int signum)
555{
556 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
557 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
558
559 auto* registry = (ServiceRegistry*)handle->data;
560 if (!registry) {
561 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
562 return;
563 }
564 ServiceRegistryRef ref{*registry};
565 auto& state = ref.get<DeviceState>();
566 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
567 auto& stats = ref.get<DataProcessingStats>();
569 size_t ri = 0;
570 while (ri != quotaEvaluator.mOffers.size()) {
571 auto& offer = quotaEvaluator.mOffers[ri];
572 // We were already offered some sharedMemory, so we
573 // do not consider the offer.
574 // FIXME: in principle this should account for memory
575 // available and being offered, however we
576 // want to get out of the woods for now.
577 if (offer.valid && offer.sharedMemory != 0) {
578 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
579 return;
580 }
581 ri++;
582 }
583 // Find the first empty offer and have 1GB of shared memory there
584 for (auto& offer : quotaEvaluator.mOffers) {
585 if (offer.valid == false) {
586 offer.cpu = 0;
587 offer.memory = 0;
588 offer.sharedMemory = 1000000000;
589 offer.valid = true;
590 offer.user = -1;
591 break;
592 }
593 }
595 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
596}
597
598static auto toBeForwardedHeader = [](void* header) -> bool {
599 // If is now possible that the record is not complete when
600 // we forward it, because of a custom completion policy.
601 // this means that we need to skip the empty entries in the
602 // record for being forwarded.
603 if (header == nullptr) {
604 return false;
605 }
606 auto sih = o2::header::get<SourceInfoHeader*>(header);
607 if (sih) {
608 return false;
609 }
610
611 auto dih = o2::header::get<DomainInfoHeader*>(header);
612 if (dih) {
613 return false;
614 }
615
616 auto dh = o2::header::get<DataHeader*>(header);
617 if (!dh) {
618 return false;
619 }
620 auto dph = o2::header::get<DataProcessingHeader*>(header);
621 if (!dph) {
622 return false;
623 }
624 return true;
625};
626
627static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
628 FairMQDeviceProxy& proxy,
629 std::unique_ptr<fair::mq::Message>& header,
630 std::unique_ptr<fair::mq::Message>& payload,
631 size_t total,
632 bool consume) {
633 if (header.get() == nullptr) {
634 // Missing an header is not an error anymore.
635 // it simply means that we did not receive the
636 // given input, but we were asked to
637 // consume existing, so we skip it.
638 return false;
639 }
640 if (payload.get() == nullptr && consume == true) {
641 // If the payload is not there, it means we already
642 // processed it with ConsumeExisiting. Therefore we
643 // need to do something only if this is the last consume.
644 header.reset(nullptr);
645 return false;
646 }
647
648 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
649 if (fdph == nullptr) {
650 LOG(error) << "Data is missing DataProcessingHeader";
651 return false;
652 }
653 auto fdh = o2::header::get<DataHeader*>(header->GetData());
654 if (fdh == nullptr) {
655 LOG(error) << "Data is missing DataHeader";
656 return false;
657 }
658
659 // We need to find the forward route only for the first
660 // part of a split payload. All the others will use the same.
661 // but always check if we have a sequence of multiple payloads
662 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
663 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
664 }
665 return cachedForwardingChoices.empty() == false;
666};
667
668struct DecongestionContext {
671};
672
673auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
674 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
675 auto& ref = task.user<DecongestionContext>().ref;
676
677 auto& decongestion = ref.get<DecongestionService>();
678 auto& proxy = ref.get<FairMQDeviceProxy>();
679 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
680 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
681 return;
682 }
683 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
684 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
685 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
686 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
687 // TODO: this we could cache in the proxy at the bind moment.
688 if (info.channelType != ChannelAccountingType::DPL) {
689 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
690 info.name.c_str());
691
692 continue;
693 }
694 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
695 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
696 info.name.c_str(), oldestTimeslice.timeslice.value);
697 }
698 }
699};
700
701// This is how we do the forwarding, i.e. we push
702// the inputs which are shared between this device and others
703// to the next one in the daisy chain.
704// FIXME: do it in a smarter way than O(N^2)
705static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
706 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
707 auto& proxy = registry.get<FairMQDeviceProxy>();
708 // we collect all messages per forward in a map and send them together
709 std::vector<fair::mq::Parts> forwardedParts;
710 forwardedParts.resize(proxy.getNumForwards());
711 std::vector<ChannelIndex> cachedForwardingChoices{};
712 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
713 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
714 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
715
716 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
717 auto& messageSet = currentSetOfInputs[ii];
718 // In case the messageSet is empty, there is nothing to be done.
719 if (messageSet.size() == 0) {
720 continue;
721 }
722 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
723 continue;
724 }
725 cachedForwardingChoices.clear();
726
727 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
728 auto& messageSet = currentSetOfInputs[ii];
729 auto& header = messageSet.header(pi);
730 auto& payload = messageSet.payload(pi);
731 auto total = messageSet.getNumberOfPayloads(pi);
732
733 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
734 continue;
735 }
736
737 // In case of more than one forward route, we need to copy the message.
738 // This will eventually use the same mamory if running with the same backend.
739 if (cachedForwardingChoices.size() > 1) {
740 copy = true;
741 }
742 auto* dh = o2::header::get<DataHeader*>(header->GetData());
743 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
744
745 if (copy) {
746 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
747 auto&& newHeader = header->GetTransport()->CreateMessage();
748 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
749 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
750 newHeader->Copy(*header);
751 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
752
753 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
754 auto&& newPayload = header->GetTransport()->CreateMessage();
755 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
756 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
757 }
758 }
759 } else {
760 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
761 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
762 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
763 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
764 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
765 }
766 }
767 }
768 }
769 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
770 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
771 if (forwardedParts[fi].Size() == 0) {
772 continue;
773 }
774 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
775 auto& parts = forwardedParts[fi];
776 if (info.policy == nullptr) {
777 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
778 continue;
779 }
780 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
781 info.policy->forward(parts, ChannelIndex{fi}, registry);
782 }
783
784 auto& asyncQueue = registry.get<AsyncQueue>();
785 auto& decongestion = registry.get<DecongestionService>();
786 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
787 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
788 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
789 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
790 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
791};
792
793extern volatile int region_read_global_dummy_variable;
795
797void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
798{
799 if (infos.empty() == false) {
800 std::vector<fair::mq::RegionInfo> toBeNotified;
801 toBeNotified.swap(infos); // avoid any MT issue.
802 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
803 for (auto const& info : toBeNotified) {
804 if (dummyRead) {
805 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
806 region_read_global_dummy_variable = ((int*)info.ptr)[i];
807 }
808 }
809 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
810 }
811 }
812}
813
814namespace
815{
817{
818 auto* state = (DeviceState*)handle->data;
820}
821} // namespace
822
823void DataProcessingDevice::initPollers()
824{
825 auto ref = ServiceRegistryRef{mServiceRegistry};
826 auto& deviceContext = ref.get<DeviceContext>();
827 auto& context = ref.get<DataProcessorContext>();
828 auto& spec = ref.get<DeviceSpec const>();
829 auto& state = ref.get<DeviceState>();
830 // We add a timer only in case a channel poller is not there.
831 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
832 for (auto& [channelName, channel] : GetChannels()) {
833 InputChannelInfo* channelInfo;
834 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
835 auto& channelSpec = spec.inputChannels[ci];
836 channelInfo = &state.inputChannelInfos[ci];
837 if (channelSpec.name != channelName) {
838 continue;
839 }
840 channelInfo->channel = &this->GetChannel(channelName, 0);
841 break;
842 }
843 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
844 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
845 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
846 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
847 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
848 continue;
849 }
850 // We only watch receiving sockets.
851 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
852 LOGP(detail, "{} is to send data. Not polling.", channelName);
853 continue;
854 }
855
856 if (channelName.rfind("from_", 0) != 0) {
857 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
858 continue;
859 }
860
861 // We assume there is always a ZeroMQ socket behind.
862 int zmq_fd = 0;
863 size_t zmq_fd_len = sizeof(zmq_fd);
864 // FIXME: I should probably save those somewhere... ;-)
865 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
866 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
867 if (zmq_fd == 0) {
868 LOG(error) << "Cannot get file descriptor for channel." << channelName;
869 continue;
870 }
871 LOGP(detail, "Polling socket for {}", channelName);
872 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
873 pCtx->name = strdup(channelName.c_str());
874 pCtx->loop = state.loop;
875 pCtx->device = this;
876 pCtx->state = &state;
877 pCtx->fd = zmq_fd;
878 assert(channelInfo != nullptr);
879 pCtx->channelInfo = channelInfo;
880 pCtx->socket = &channel[0].GetSocket();
881 pCtx->read = true;
882 poller->data = pCtx;
883 uv_poll_init(state.loop, poller, zmq_fd);
884 if (channelName.rfind("from_", 0) != 0) {
885 LOGP(detail, "{} is an out of band channel.", channelName);
886 state.activeOutOfBandPollers.push_back(poller);
887 } else {
888 channelInfo->pollerIndex = state.activeInputPollers.size();
889 state.activeInputPollers.push_back(poller);
890 }
891 }
892 // In case we do not have any input channel and we do not have
893 // any timers or signal watchers we still wake up whenever we can send data to downstream
894 // devices to allow for enumerations.
895 if (state.activeInputPollers.empty() &&
896 state.activeOutOfBandPollers.empty() &&
897 state.activeTimers.empty() &&
898 state.activeSignals.empty()) {
899 // FIXME: this is to make sure we do not reset the output timer
900 // for readout proxies or similar. In principle this should go once
901 // we move to OutOfBand InputSpec.
902 if (state.inputChannelInfos.empty()) {
903 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
904 deviceContext.exitTransitionTimeout = 0;
905 }
906 for (auto& [channelName, channel] : GetChannels()) {
907 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
908 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
909 continue;
910 }
911 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
912 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
913 continue;
914 }
915 // We assume there is always a ZeroMQ socket behind.
916 int zmq_fd = 0;
917 size_t zmq_fd_len = sizeof(zmq_fd);
918 // FIXME: I should probably save those somewhere... ;-)
919 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
920 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
921 if (zmq_fd == 0) {
922 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
923 continue;
924 }
925 LOG(detail) << "Polling socket for " << channel[0].GetName();
926 // FIXME: leak
927 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
928 pCtx->name = strdup(channelName.c_str());
929 pCtx->loop = state.loop;
930 pCtx->device = this;
931 pCtx->state = &state;
932 pCtx->fd = zmq_fd;
933 pCtx->read = false;
934 poller->data = pCtx;
935 uv_poll_init(state.loop, poller, zmq_fd);
936 state.activeOutputPollers.push_back(poller);
937 }
938 }
939 } else {
940 LOGP(detail, "This is a fake device so we exit after the first iteration.");
941 deviceContext.exitTransitionTimeout = 0;
942 // This is a fake device, so we can request to exit immediately
943 ServiceRegistryRef ref{mServiceRegistry};
944 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
945 // A two second timer to stop internal devices which do not want to
946 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
947 uv_timer_init(state.loop, timer);
948 timer->data = &state;
949 uv_update_time(state.loop);
950 uv_timer_start(timer, on_idle_timer, 2000, 2000);
951 state.activeTimers.push_back(timer);
952 }
953}
954
955void DataProcessingDevice::startPollers()
956{
957 auto ref = ServiceRegistryRef{mServiceRegistry};
958 auto& deviceContext = ref.get<DeviceContext>();
959 auto& state = ref.get<DeviceState>();
960
961 for (auto* poller : state.activeInputPollers) {
962 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
963 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
964 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
965 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
966 }
967 for (auto& poller : state.activeOutOfBandPollers) {
968 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
969 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
970 }
971 for (auto* poller : state.activeOutputPollers) {
972 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
973 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
974 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
975 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
976 }
977
978 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
979 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
980 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
981
982 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
983 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
984 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
985}
986
987void DataProcessingDevice::stopPollers()
988{
989 auto ref = ServiceRegistryRef{mServiceRegistry};
990 auto& deviceContext = ref.get<DeviceContext>();
991 auto& state = ref.get<DeviceState>();
992 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
993 for (auto* poller : state.activeInputPollers) {
994 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
995 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
996 uv_poll_stop(poller);
997 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
998 }
999 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
1000 for (auto* poller : state.activeOutOfBandPollers) {
1001 uv_poll_stop(poller);
1002 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1003 }
1004 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
1005 for (auto* poller : state.activeOutputPollers) {
1006 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
1007 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
1008 uv_poll_stop(poller);
1009 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1010 }
1011
1012 uv_timer_stop(deviceContext.gracePeriodTimer);
1013 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
1014 free(deviceContext.gracePeriodTimer);
1015 deviceContext.gracePeriodTimer = nullptr;
1016
1017 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1018 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1019 free(deviceContext.dataProcessingGracePeriodTimer);
1020 deviceContext.dataProcessingGracePeriodTimer = nullptr;
1021}
1022
1024{
1025 auto ref = ServiceRegistryRef{mServiceRegistry};
1026 auto& deviceContext = ref.get<DeviceContext>();
1027 auto& context = ref.get<DataProcessorContext>();
1028
1029 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1030 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
1031 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
1032 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
1033 auto& state = ref.get<DeviceState>();
1034 int i = 0;
1035 for (auto& di : distinct) {
1036 auto& route = spec.inputs[di];
1037 if (route.configurator.has_value() == false) {
1038 i++;
1039 continue;
1040 }
1041 ExpirationHandler handler{
1042 .name = route.configurator->name,
1043 .routeIndex = RouteIndex{i++},
1044 .lifetime = route.matcher.lifetime,
1045 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1046 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1047 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1048 context.expirationHandlers.emplace_back(std::move(handler));
1049 }
1050
1051 if (state.awakeMainThread == nullptr) {
1052 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1053 state.awakeMainThread->data = &state;
1054 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1055 }
1056
1057 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1058 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1059 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1060
1061 for (auto& channel : GetChannels()) {
1062 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1063 &registry = mServiceRegistry,
1064 &pendingRegionInfos = mPendingRegionInfos,
1065 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1066 std::lock_guard<std::mutex> lock(regionInfoMutex);
1067 LOG(detail) << ">>> Region info event" << info.event;
1068 LOG(detail) << "id: " << info.id;
1069 LOG(detail) << "ptr: " << info.ptr;
1070 LOG(detail) << "size: " << info.size;
1071 LOG(detail) << "flags: " << info.flags;
1072 // Now we check for pending events with the mutex,
1073 // so the lines below are atomic.
1074 pendingRegionInfos.push_back(info);
1075 context.expectedRegionCallbacks -= 1;
1076 // We always want to handle these on the main loop,
1077 // so we awake it.
1078 ServiceRegistryRef ref{registry};
1079 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1080 });
1081 }
1082
1083 // Add a signal manager for SIGUSR1 so that we can force
1084 // an event from the outside, making sure that the event loop can
1085 // be unblocked (e.g. by a quitting DPL driver) even when there
1086 // is no data pending to be processed.
1087 if (deviceContext.sigusr1Handle == nullptr) {
1088 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1089 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1090 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1091 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1092 }
1093 // If there is any signal, we want to make sure they are active
1094 for (auto& handle : state.activeSignals) {
1095 handle->data = &state;
1096 }
1097 // When we start, we must make sure that we do listen to the signal
1098 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1099
1101 DataProcessingDevice::initPollers();
1102
1103 // Whenever we InitTask, we consider as if the previous iteration
1104 // was successful, so that even if there is no timer or receiving
1105 // channel, we can still start an enumeration.
1106 DataProcessorContext* initialContext = nullptr;
1107 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1108 if (!idle) {
1109 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1110 }
1111
1112 // We should be ready to run here. Therefore we copy all the
1113 // required parts in the DataProcessorContext. Eventually we should
1114 // do so on a per thread basis, with fine grained locks.
1115 // FIXME: this should not use ServiceRegistry::threadSalt, but
1116 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1117 // N is the number of the multiplexed data processor.
1118 // We will get there.
1119 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1120
1121 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1122
1123 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1124 std::lock_guard<std::mutex> lock(mutex);
1125 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1126 };
1127 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1132 while (hasPendingEvents(deviceContext)) {
1133 // Wait for the callback to signal its done, so that we do not busy wait.
1134 uv_run(state.loop, UV_RUN_ONCE);
1135 // Handle callbacks if any
1136 {
1137 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1138 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1139 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1140 }
1141 }
1142 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1143}
1144
1146{
1147 context.isSink = false;
1148 // If nothing is a sink, the rate limiting simply does not trigger.
1149 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1150
1151 auto ref = ServiceRegistryRef{mServiceRegistry};
1152 auto& spec = ref.get<DeviceSpec const>();
1153
1154 // The policy is now allowed to state the default.
1155 context.balancingInputs = spec.completionPolicy.balanceChannels;
1156 // This is needed because the internal injected dummy sink should not
1157 // try to balance inputs unless the rate limiting is requested.
1158 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1159 context.balancingInputs = false;
1160 }
1161 if (enableRateLimiting) {
1162 for (auto& spec : spec.outputs) {
1163 if (spec.matcher.binding.value == "dpl-summary") {
1164 context.isSink = true;
1165 break;
1166 }
1167 }
1168 }
1169
1170 context.registry = &mServiceRegistry;
1173 if (context.error != nullptr) {
1174 context.errorHandling = [&errorCallback = context.error,
1175 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1179 auto& err = error_from_ref(e);
1180 auto& context = ref.get<DataProcessorContext>();
1181 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1182 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1183 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1184 auto& stats = ref.get<DataProcessingStats>();
1186 ErrorContext errorContext{record, ref, e};
1187 errorCallback(errorContext);
1188 };
1189 } else {
1190 context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
1191 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1192 auto& err = error_from_ref(e);
1196 auto& context = ref.get<DataProcessorContext>();
1197 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1198 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1199 auto& stats = ref.get<DataProcessingStats>();
1201 switch (errorPolicy) {
1203 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1204 throw e;
1205 default:
1206 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1207 break;
1208 }
1209 };
1210 }
1211
1212 auto decideEarlyForward = [&context, &spec, this]() -> bool {
1215 bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1216 bool onlyConditions = true;
1217 bool overriddenEarlyForward = false;
1218 for (auto& forwarded : spec.forwards) {
1219 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1220 onlyConditions = false;
1221 }
1222#if !__has_include(<fairmq/shmem/Message.h>)
1223 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1224 context.canForwardEarly = false;
1225 overriddenEarlyForward = true;
1226 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1227 break;
1228 }
1229#endif
1230 if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1231 context.canForwardEarly = false;
1232 overriddenEarlyForward = true;
1233 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1234 break;
1235 }
1236 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1237 context.canForwardEarly = false;
1238 overriddenEarlyForward = true;
1239 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1240 break;
1241 }
1242 }
1243 if (!overriddenEarlyForward && onlyConditions) {
1244 context.canForwardEarly = true;
1245 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1246 }
1247 return canForwardEarly;
1248 };
1249 context.canForwardEarly = decideEarlyForward();
1250}
1251
1253{
1254 auto ref = ServiceRegistryRef{mServiceRegistry};
1255 auto& state = ref.get<DeviceState>();
1256
1257 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1258 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1259 state.quitRequested = false;
1261 state.allowedProcessing = DeviceState::Any;
1262 for (auto& info : state.inputChannelInfos) {
1263 if (info.state != InputChannelState::Pull) {
1264 info.state = InputChannelState::Running;
1265 }
1266 }
1267
1268 // Catch callbacks which fail before we start.
1269 // Notice that when running multiple dataprocessors
1270 // we should probably allow expendable ones to fail.
1271 try {
1272 auto& dpContext = ref.get<DataProcessorContext>();
1273 dpContext.preStartCallbacks(ref);
1274 for (size_t i = 0; i < mStreams.size(); ++i) {
1275 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1276 auto& context = streamRef.get<StreamContext>();
1277 context.preStartStreamCallbacks(streamRef);
1278 }
1279 } catch (std::exception& e) {
1280 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1281 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1282 throw;
1283 } catch (o2::framework::RuntimeErrorRef& e) {
1284 auto& err = error_from_ref(e);
1285 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1286 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1287 throw;
1288 } catch (...) {
1289 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1290 throw;
1291 }
1292
1293 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1294 startPollers();
1295
1296 // Raise to 1 when we are ready to start processing
1297 using o2::monitoring::Metric;
1298 using o2::monitoring::Monitoring;
1299 using o2::monitoring::tags::Key;
1300 using o2::monitoring::tags::Value;
1301
1302 auto& monitoring = ref.get<Monitoring>();
1303 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1304 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1305}
1306
1308{
1309 ServiceRegistryRef ref{mServiceRegistry};
1310 // Raise to 1 when we are ready to start processing
1311 using o2::monitoring::Metric;
1312 using o2::monitoring::Monitoring;
1313 using o2::monitoring::tags::Key;
1314 using o2::monitoring::tags::Value;
1315
1316 auto& monitoring = ref.get<Monitoring>();
1317 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1318
1319 stopPollers();
1320 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1321 auto& dpContext = ref.get<DataProcessorContext>();
1322 dpContext.postStopCallbacks(ref);
1323}
1324
1326{
1327 ServiceRegistryRef ref{mServiceRegistry};
1328 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1329}
1330
1332{
1333 ServiceRegistryRef ref{mServiceRegistry};
1334 auto& state = ref.get<DeviceState>();
1336 bool firstLoop = true;
1337 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1338 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1339
1340 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1341 if (dplEnableMultithreding) {
1342 setenv("UV_THREADPOOL_SIZE", "1", 1);
1343 }
1344
1345 while (state.transitionHandling != TransitionHandlingState::Expired) {
1346 if (state.nextFairMQState.empty() == false) {
1347 (void)this->ChangeState(state.nextFairMQState.back());
1348 state.nextFairMQState.pop_back();
1349 }
1350 // Notify on the main thread the new region callbacks, making sure
1351 // no callback is issued if there is something still processing.
1352 {
1353 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1354 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1355 }
1356 // This will block for the correct delay (or until we get data
1357 // on a socket). We also do not block on the first iteration
1358 // so that devices which do not have a timer can still start an
1359 // enumeration.
1360 {
1361 ServiceRegistryRef ref{mServiceRegistry};
1362 ref.get<DriverClient>().flushPending(mServiceRegistry);
1363 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1364 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1365 // In such case we will take care of it at next iteration.
1366 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1367
1368 auto shouldNotWait = (lastActive != nullptr &&
1369 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1371 if (firstLoop) {
1372 shouldNotWait = true;
1373 firstLoop = false;
1374 }
1375 if (lastActive != nullptr) {
1377 }
1378 if (NewStatePending()) {
1379 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1380 shouldNotWait = true;
1382 }
1383 if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1384 state.transitionHandling = TransitionHandlingState::Requested;
1385 auto& deviceContext = ref.get<DeviceContext>();
1386 // Check if we only have timers
1387 auto& spec = ref.get<DeviceSpec const>();
1388 if (hasOnlyTimers(spec)) {
1390 }
1391
1392 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1393 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1394 uv_update_time(state.loop);
1395 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1396 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1397 }
1398 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1399 state.transitionHandling = TransitionHandlingState::Requested;
1400 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1401 uv_update_time(state.loop);
1402 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1403 deviceContext.exitTransitionTimeout);
1404 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1405 bool onlyGenerated = hasOnlyGenerated(spec);
1406 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1407 if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1408 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1409 } else {
1410 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1411 "New state requested. Waiting for %d seconds before %{public}s",
1412 timeout,
1413 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1414 }
1415 } else {
1416 state.transitionHandling = TransitionHandlingState::Expired;
1417 if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1418 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1419 } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1420 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1421 } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1422 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1423 } else {
1424 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1425 }
1426 }
1427 }
1428 // If we are Idle, we can then consider the transition to be expired.
1429 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1430 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1431 state.transitionHandling = TransitionHandlingState::Expired;
1432 }
1433 if (state.severityStack.empty() == false) {
1434 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1435 state.severityStack.pop_back();
1436 }
1437 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1438 // shouldNotWait |= info.readPolled;
1439 // }
1440 state.loopReason = DeviceState::NO_REASON;
1441 state.firedTimers.clear();
1442 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1443 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1444 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1445 }
1446 // Run the asynchronous queue just before sleeping again, so that:
1447 // - we can trigger further events from the queue
1448 // - we can guarantee this is the last thing we do in the loop (
1449 // assuming no one else is adding to the queue before this point).
1450 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1451 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1452 ServiceRegistryRef ref{registry};
1453 ref.get<AsyncQueue>();
1454 ref.get<DecongestionService>();
1455 ref.get<DataRelayer>();
1456 // Get the current timeslice for the slot.
1457 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1459 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1460 };
1461 auto& relayer = ref.get<DataRelayer>();
1462 relayer.prunePending(onDrop);
1463 auto& queue = ref.get<AsyncQueue>();
1464 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1465 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1466 if (shouldNotWait == false) {
1467 auto& dpContext = ref.get<DataProcessorContext>();
1468 dpContext.preLoopCallbacks(ref);
1469 }
1470 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1471 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1472 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1473 if ((state.loopReason & state.tracingFlags) != 0) {
1474 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1475 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1476 } else if (state.severityStack.empty() == false) {
1477 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1478 state.severityStack.pop_back();
1479 }
1480 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1481
1482 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1483 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1484 relayer.rescan();
1485 }
1486
1487 if (!state.pendingOffers.empty()) {
1488 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1489 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1490 }
1491 }
1492
1493 // Notify on the main thread the new region callbacks, making sure
1494 // no callback is issued if there is something still processing.
1495 // Notice that we still need to perform callbacks also after
1496 // the socket epolled, because otherwise we would end up serving
1497 // the callback after the first data arrives is the system is too
1498 // fast to transition from Init to Run.
1499 {
1500 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1501 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1502 }
1503
1504 assert(mStreams.size() == mHandles.size());
1506 TaskStreamRef streamRef{-1};
1507 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1508 auto& taskInfo = mStreams[ti];
1509 if (taskInfo.running) {
1510 continue;
1511 }
1512 // Stream 0 is for when we run in
1513 streamRef.index = ti;
1514 }
1515 using o2::monitoring::Metric;
1516 using o2::monitoring::Monitoring;
1517 using o2::monitoring::tags::Key;
1518 using o2::monitoring::tags::Value;
1519 // We have an empty stream, let's check if we have enough
1520 // resources for it to run something
1521 if (streamRef.index != -1) {
1522 // Synchronous execution of the callbacks. This will be moved in the
1523 // moved in the on_socket_polled once we have threading in place.
1524 uv_work_t& handle = mHandles[streamRef.index];
1525 TaskStreamInfo& stream = mStreams[streamRef.index];
1526 handle.data = &mStreams[streamRef.index];
1527
1528 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1529 ServiceRegistryRef ref{registry};
1530 auto& dpStats = ref.get<DataProcessingStats>();
1531 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1532 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1533 dpStats.processCommandQueue();
1534 };
1535 auto ref = ServiceRegistryRef{mServiceRegistry};
1536
1537 // Deciding wether to run or not can be done by passing a request to
1538 // the evaluator. In this case, the request is always satisfied and
1539 // we run on whatever resource is available.
1540 auto& spec = ref.get<DeviceSpec const>();
1541 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1542
1543 if (enough) {
1544 stream.id = streamRef;
1545 stream.running = true;
1546 stream.registry = &mServiceRegistry;
1547 if (dplEnableMultithreding) [[unlikely]] {
1548 stream.task = &handle;
1549 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1550 } else {
1551 run_callback(&handle);
1552 run_completion(&handle, 0);
1553 }
1554 } else {
1555 auto ref = ServiceRegistryRef{mServiceRegistry};
1556 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1557 }
1558 }
1559 }
1560
1561 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1562 auto& spec = ref.get<DeviceSpec const>();
1564 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1565 auto& info = state.inputChannelInfos[ci];
1566 info.parts.fParts.clear();
1567 }
1568 state.transitionHandling = TransitionHandlingState::NoTransition;
1569}
1570
1574{
1575 auto& context = ref.get<DataProcessorContext>();
1576 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1577 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1578
1579 {
1580 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1581 }
1582 // Whether or not we had something to do.
1583
1584 // Initialise the value for context.allDone. It will possibly be updated
1585 // below if any of the channels is not done.
1586 //
1587 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1588 // expect to receive an EndOfStream signal. Thus we do not wait for these
1589 // to be completed. In the case of data source devices, as they do not have
1590 // real data input channels, they have to signal EndOfStream themselves.
1591 auto& state = ref.get<DeviceState>();
1592 auto& spec = ref.get<DeviceSpec const>();
1593 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1594 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1595 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1596 if (info.channel) {
1597 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1598 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1599 } else {
1600 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1601 }
1602 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1603 });
1604 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1605 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1608 static std::vector<int> pollOrder;
1609 pollOrder.resize(state.inputChannelInfos.size());
1610 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1611 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1612 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1613 });
1614
1615 // Nothing to poll...
1616 if (pollOrder.empty()) {
1617 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1618 return;
1619 }
1620 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1621 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1622 auto delta = currentNewest.value - currentOldest.value;
1623 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1624 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1625 auto& infos = state.inputChannelInfos;
1626
1627 if (context.balancingInputs) {
1628 static int pipelineLength = DefaultsHelpers::pipelineLength();
1629 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1630 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1631 return infos[a].oldestForChannel.value > limitNew;
1632 });
1633 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1634 const auto& channelInfo = state.inputChannelInfos[*it];
1635 if (channelInfo.pollerIndex != -1) {
1636 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1637 auto& pollerContext = *(PollerContext*)(poller->data);
1638 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1639 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1640 bool shouldBeRunning = it < newEnd;
1641 if (running != shouldBeRunning) {
1642 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1643 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1644 }
1645 }
1646 }
1647 }
1648 pollOrder.erase(newEnd, pollOrder.end());
1649 }
1650 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1651
1652 for (auto sci : pollOrder) {
1653 auto& info = state.inputChannelInfos[sci];
1654 auto& channelSpec = spec.inputChannels[sci];
1655 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1656 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1657
1658 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1659 context.allDone = false;
1660 }
1661 if (info.state != InputChannelState::Running) {
1662 // Remember to flush data if we are not running
1663 // and there is some message pending.
1664 if (info.parts.Size()) {
1666 }
1667 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1668 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1669 continue;
1670 }
1671 if (info.channel == nullptr) {
1672 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1673 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1674 continue;
1675 }
1676 // Only poll DPL channels for now.
1678 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1679 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1680 continue;
1681 }
1682 auto& socket = info.channel->GetSocket();
1683 // If we have pending events from a previous iteration,
1684 // we do receive in any case.
1685 // Otherwise we check if there is any pending event and skip
1686 // this channel in case there is none.
1687 if (info.hasPendingEvents == 0) {
1688 socket.Events(&info.hasPendingEvents);
1689 // If we do not read, we can continue.
1690 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1691 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1692 continue;
1693 }
1694 }
1695 // We can reset this, because it means we have seen at least 1
1696 // message after the UV_READABLE was raised.
1697 info.readPolled = false;
1698 // Notice that there seems to be a difference between the documentation
1699 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1700 // is raised does not mean that a message is immediately available to
1701 // read, just that it will be available soon, so the receive can
1702 // still return -2. To avoid this we keep receiving on the socket until
1703 // we get a message. In order not to overflow the DPL queue we process
1704 // one message at the time and we keep track of wether there were more
1705 // to process.
1706 bool newMessages = false;
1707 while (true) {
1708 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1709 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1710 if (info.parts.Size() < 64) {
1711 fair::mq::Parts parts;
1712 info.channel->Receive(parts, 0);
1713 if (parts.Size()) {
1714 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1715 }
1716 for (auto&& part : parts) {
1717 info.parts.fParts.emplace_back(std::move(part));
1718 }
1719 newMessages |= true;
1720 }
1721
1722 if (info.parts.Size() >= 0) {
1724 // Receiving data counts as activity now, so that
1725 // We can make sure we process all the pending
1726 // messages without hanging on the uv_run.
1727 break;
1728 }
1729 }
1730 // We check once again for pending events, keeping track if this was the
1731 // case so that we can immediately repeat this loop and avoid remaining
1732 // stuck in uv_run. This is because we will not get notified on the socket
1733 // if more events are pending due to zeromq level triggered approach.
1734 socket.Events(&info.hasPendingEvents);
1735 if (info.hasPendingEvents) {
1736 info.readPolled = false;
1737 // In case there were messages, we consider it as activity
1738 if (newMessages) {
1739 state.lastActiveDataProcessor.store(&context);
1740 }
1741 }
1742 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1743 channelSpec.name.c_str(), info.id.value);
1744 }
1745}
1746
1748{
1749 auto& context = ref.get<DataProcessorContext>();
1750 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1751 auto& state = ref.get<DeviceState>();
1752 auto& spec = ref.get<DeviceSpec const>();
1753
1754 if (state.streaming == StreamingState::Idle) {
1755 return;
1756 }
1757
1758 context.completed.clear();
1759 context.completed.reserve(16);
1760 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1761 state.lastActiveDataProcessor.store(&context);
1762 }
1763 DanglingContext danglingContext{*context.registry};
1764
1765 context.preDanglingCallbacks(danglingContext);
1766 if (state.lastActiveDataProcessor.load() == nullptr) {
1767 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1768 }
1769 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1770 if (activity.expiredSlots > 0) {
1771 state.lastActiveDataProcessor = &context;
1772 }
1773
1774 context.completed.clear();
1775 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1776 state.lastActiveDataProcessor = &context;
1777 }
1778
1779 context.postDanglingCallbacks(danglingContext);
1780
1781 // If we got notified that all the sources are done, we call the EndOfStream
1782 // callback and return false. Notice that what happens next is actually
1783 // dependent on the callback, not something which is controlled by the
1784 // framework itself.
1785 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1787 state.lastActiveDataProcessor = &context;
1788 }
1789
1790 if (state.streaming == StreamingState::EndOfStreaming) {
1791 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1792 // We keep processing data until we are Idle.
1793 // FIXME: not sure this is the correct way to drain the queues, but
1794 // I guess we will see.
1797 auto& relayer = ref.get<DataRelayer>();
1798
1799 bool shouldProcess = hasOnlyGenerated(spec) == false;
1800
1801 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1802 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1803 }
1804
1805 auto& timingInfo = ref.get<TimingInfo>();
1806 // We should keep the data generated at end of stream only for those
1807 // which are not sources.
1808 timingInfo.keepAtEndOfStream = shouldProcess;
1809 // Fill timinginfo with some reasonable values for data sent with endOfStream
1810 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1811 timingInfo.tfCounter = -1;
1812 timingInfo.firstTForbit = -1;
1813 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1814 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1815 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1816
1817 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1818
1819 context.preEOSCallbacks(eosContext);
1820 auto& streamContext = ref.get<StreamContext>();
1821 streamContext.preEOSCallbacks(eosContext);
1822 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1823 streamContext.postEOSCallbacks(eosContext);
1824 context.postEOSCallbacks(eosContext);
1825
1826 for (auto& channel : spec.outputChannels) {
1827 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1829 }
1830 // This is needed because the transport is deleted before the device.
1831 relayer.clear();
1833 // In case we should process, note the data processor responsible for it
1834 if (shouldProcess) {
1835 state.lastActiveDataProcessor = &context;
1836 }
1837 // On end of stream we shut down all output pollers.
1838 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1839 for (auto& poller : state.activeOutputPollers) {
1840 uv_poll_stop(poller);
1841 }
1842 return;
1843 }
1844
1845 if (state.streaming == StreamingState::Idle) {
1846 // On end of stream we shut down all output pollers.
1847 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1848 for (auto& poller : state.activeOutputPollers) {
1849 uv_poll_stop(poller);
1850 }
1851 }
1852
1853 return;
1854}
1855
1857{
1858 ServiceRegistryRef ref{mServiceRegistry};
1859 ref.get<DataRelayer>().clear();
1860 auto& deviceContext = ref.get<DeviceContext>();
1861 // If the signal handler is there, we should
1862 // hide the registry from it, so that we do not
1863 // end up calling the signal handler on something
1864 // which might not be there anymore.
1865 if (deviceContext.sigusr1Handle) {
1866 deviceContext.sigusr1Handle->data = nullptr;
1867 }
1868 // Makes sure we do not have a working context on
1869 // shutdown.
1870 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1871 handle->data = nullptr;
1872 }
1873}
1874
1877 {
1878 }
1879};
1880
1886{
1887 using InputInfo = DataRelayer::InputInfo;
1889
1890 auto& context = ref.get<DataProcessorContext>();
1891 // This is the same id as the upper level function, so we get the events
1892 // associated with the same interval. We will simply use "handle_data" as
1893 // the category.
1894 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1895
1896 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1897 // and we do a few stats. We bind parts as a lambda captured variable, rather
1898 // than an input, because we do not want the outer loop actually be exposed
1899 // to the implementation details of the messaging layer.
1900 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1901 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1902 auto ref = ServiceRegistryRef{*context.registry};
1903 auto& stats = ref.get<DataProcessingStats>();
1904 auto& state = ref.get<DeviceState>();
1905 auto& parts = info.parts;
1906 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1907
1908 std::vector<InputInfo> results;
1909 // we can reserve the upper limit
1910 results.reserve(parts.Size() / 2);
1911 size_t nTotalPayloads = 0;
1912
1913 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1914 results.emplace_back(position, length, type, index);
1915 if (type != InputType::Invalid && length > 1) {
1916 nTotalPayloads += length - 1;
1917 }
1918 };
1919
1920 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1921 auto* headerData = parts.At(pi)->GetData();
1922 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1923 if (sih) {
1924 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1925 info.state = sih->state;
1926 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1927 state.lastActiveDataProcessor = &context;
1928 continue;
1929 }
1930 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1931 if (dih) {
1932 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1933 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1934 state.lastActiveDataProcessor = &context;
1935 continue;
1936 }
1937 auto dh = o2::header::get<DataHeader*>(headerData);
1938 if (!dh) {
1939 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1940 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1941 continue;
1942 }
1943 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1944 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1945 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1946 continue;
1947 }
1948 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1949 // We only deal with the tracking of parts if the log is enabled.
1950 // This is because in principle we should track the size of each of
1951 // the parts and sum it up. Not for now.
1952 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1953 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1954 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1955 if (!dph) {
1956 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1957 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1958 continue;
1959 }
1960 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1961 // this is indicating a sequence of payloads following the header
1962 // FIXME: we will probably also set the DataHeader version
1963 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1964 pi += dh->splitPayloadParts - 1;
1965 } else {
1966 // We can set the type for the next splitPayloadParts
1967 // because we are guaranteed they are all the same.
1968 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1969 // pair.
1970 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1971 if (finalSplitPayloadIndex > parts.Size()) {
1972 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1973 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1974 continue;
1975 }
1976 insertInputInfo(pi, 2, InputType::Data, info.id);
1977 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1978 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1979 }
1980 }
1981 }
1982 if (results.size() + nTotalPayloads != parts.Size()) {
1983 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1984 return std::nullopt;
1985 }
1986 return results;
1987 };
1988
1989 auto reportError = [ref](const char* message) {
1990 auto& stats = ref.get<DataProcessingStats>();
1992 };
1993
1994 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1995 auto& relayer = ref.get<DataRelayer>();
1996 auto& state = ref.get<DeviceState>();
1997 static WaitBackpressurePolicy policy;
1998 auto& parts = info.parts;
1999 // We relay execution to make sure we have a complete set of parts
2000 // available.
2001 bool hasBackpressure = false;
2002 size_t minBackpressureTimeslice = -1;
2003 bool hasData = false;
2004 size_t oldestPossibleTimeslice = -1;
2005 static std::vector<int> ordering;
2006 // Same as inputInfos but with iota.
2007 ordering.resize(inputInfos.size());
2008 std::iota(ordering.begin(), ordering.end(), 0);
2009 // stable sort orderings by type and position
2010 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
2011 auto const& ai = inputInfos[a];
2012 auto const& bi = inputInfos[b];
2013 if (ai.type != bi.type) {
2014 return ai.type < bi.type;
2015 }
2016 return ai.position < bi.position;
2017 });
2018 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
2019 auto const& input = inputInfos[ordering[ii]];
2020 switch (input.type) {
2021 case InputType::Data: {
2022 hasData = true;
2023 auto headerIndex = input.position;
2024 auto nMessages = 0;
2025 auto nPayloadsPerHeader = 0;
2026 if (input.size > 2) {
2027 // header and multiple payload sequence
2028 nMessages = input.size;
2029 nPayloadsPerHeader = nMessages - 1;
2030 } else {
2031 // multiple header-payload pairs
2032 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2033 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2034 nPayloadsPerHeader = 1;
2035 ii += (nMessages / 2) - 1;
2036 }
2037 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
2038 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
2039 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2040 slot.index, oldestOutputInfo.timeslice.value);
2041 ref.get<AsyncQueue>();
2042 ref.get<DecongestionService>();
2043 ref.get<DataRelayer>();
2044 // Get the current timeslice for the slot.
2045 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
2047 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
2048 };
2049 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2050 &parts.At(headerIndex),
2051 input,
2052 nMessages,
2053 nPayloadsPerHeader,
2054 onDrop);
2055 switch (relayed.type) {
2057 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
2058 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
2059 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2060 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2061 info.backpressureNotified = true;
2062 info.normalOpsNotified = false;
2063 }
2064 policy.backpressure(info);
2065 hasBackpressure = true;
2066 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2067 break;
2071 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2072 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2073 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2074 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2075 info.normalOpsNotified = true;
2076 info.backpressureNotified = false;
2077 }
2078 break;
2079 }
2080 } break;
2081 case InputType::SourceInfo: {
2082 LOGP(detail, "Received SourceInfo");
2083 auto& context = ref.get<DataProcessorContext>();
2084 state.lastActiveDataProcessor = &context;
2085 auto headerIndex = input.position;
2086 auto payloadIndex = input.position + 1;
2087 assert(payloadIndex < parts.Size());
2088 // FIXME: the message with the end of stream cannot contain
2089 // split parts.
2090 parts.At(headerIndex).reset(nullptr);
2091 parts.At(payloadIndex).reset(nullptr);
2092 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2093 // parts.At(headerIndex + 1 + i).reset(nullptr);
2094 // }
2095 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2096
2097 } break;
2098 case InputType::DomainInfo: {
2101 auto& context = ref.get<DataProcessorContext>();
2102 state.lastActiveDataProcessor = &context;
2103 auto headerIndex = input.position;
2104 auto payloadIndex = input.position + 1;
2105 assert(payloadIndex < parts.Size());
2106 // FIXME: the message with the end of stream cannot contain
2107 // split parts.
2108
2109 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2110 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2111 break;
2112 }
2113 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2114 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2115 parts.At(headerIndex).reset(nullptr);
2116 parts.At(payloadIndex).reset(nullptr);
2117 } break;
2118 case InputType::Invalid: {
2119 reportError("Invalid part found.");
2120 } break;
2121 }
2122 }
2125 if (oldestPossibleTimeslice != (size_t)-1) {
2126 info.oldestForChannel = {oldestPossibleTimeslice};
2127 auto& context = ref.get<DataProcessorContext>();
2128 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2129 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2130 state.lastActiveDataProcessor = &context;
2131 }
2132 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2133 parts.fParts.erase(it, parts.end());
2134 if (parts.fParts.size()) {
2135 LOG(debug) << parts.fParts.size() << " messages backpressured";
2136 }
2137 };
2138
2139 // Second part. This is the actual outer loop we want to obtain, with
2140 // implementation details which can be read. Notice how most of the state
2141 // is actually hidden. For example we do not expose what "input" is. This
2142 // will allow us to keep the same toplevel logic even if the actual meaning
2143 // of input is changed (for example we might move away from multipart
2144 // messages). Notice also that we need to act diffently depending on the
2145 // actual CompletionOp we want to perform. In particular forwarding inputs
2146 // also gets rid of them from the cache.
2147 auto inputTypes = getInputTypes();
2148 if (bool(inputTypes) == false) {
2149 reportError("Parts should come in couples. Dropping it.");
2150 return;
2151 }
2152 handleValidMessages(*inputTypes);
2153 return;
2154}
2155
2156namespace
2157{
2158struct InputLatency {
2159 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2160 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2161};
2162
2163auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2164{
2165 InputLatency result;
2166
2167 for (auto& item : record) {
2168 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2169 if (header == nullptr) {
2170 continue;
2171 }
2172 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2173 if (partLatency < 0) {
2174 partLatency = 0;
2175 }
2176 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2177 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2178 }
2179 return result;
2180};
2181
2182auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2183{
2184 size_t totalInputSize = 0;
2185 for (auto& item : record) {
2186 auto* header = o2::header::get<DataHeader*>(item.header);
2187 if (header == nullptr) {
2188 continue;
2189 }
2190 totalInputSize += header->payloadSize;
2191 }
2192 return totalInputSize;
2193};
2194
2195template <typename T>
2196void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2197{
2198 T prev_value = maximum_value;
2199 while (prev_value < value &&
2200 !maximum_value.compare_exchange_weak(prev_value, value)) {
2201 }
2202}
2203} // namespace
2204
2205bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2206{
2207 auto& context = ref.get<DataProcessorContext>();
2208 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2209 // This is the actual hidden state for the outer loop. In case we decide we
2210 // want to support multithreaded dispatching of operations, I can simply
2211 // move these to some thread local store and the rest of the lambdas
2212 // should work just fine.
2213 std::vector<MessageSet> currentSetOfInputs;
2214
2215 //
2216 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2217 auto& relayer = ref.get<DataRelayer>();
2218 if (consume) {
2219 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2220 } else {
2221 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2222 }
2223 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2224 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2225 const char* headerptr = nullptr;
2226 const char* payloadptr = nullptr;
2227 size_t payloadSize = 0;
2228 // - each input can have multiple parts
2229 // - "part" denotes a sequence of messages belonging together, the first message of the
2230 // sequence is the header message
2231 // - each part has one or more payload messages
2232 // - InputRecord provides all payloads as header-payload pair
2233 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2234 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2235 headerptr = static_cast<char const*>(headerMsg->GetData());
2236 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2237 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2238 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2239 }
2240 return DataRef{};
2241 };
2242 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2243 return currentSetOfInputs[i].getNumberOfPairs();
2244 };
2245#if __has_include(<fairmq/shmem/Message.h>)
2246 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2247 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2248 return header.GetRefCount();
2249 };
2250#else
2251 std::function<int(size_t)> refCountGetter = nullptr;
2252#endif
2253 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2254 };
2255
2256 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2257 auto& relayer = ref.get<DataRelayer>();
2259 };
2260
2261 // I need a preparation step which gets the current timeslice id and
2262 // propagates it to the various contextes (i.e. the actual entities which
2263 // create messages) because the messages need to have the timeslice id into
2264 // it.
2265 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2266 auto& relayer = ref.get<DataRelayer>();
2267 auto& timingInfo = ref.get<TimingInfo>();
2268 auto timeslice = relayer.getTimesliceForSlot(i);
2269
2270 timingInfo.timeslice = timeslice.value;
2271 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2272 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2273 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2274 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2275 };
2276 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2277 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2278 auto& relayer = ref.get<DataRelayer>();
2279 auto& timingInfo = ref.get<TimingInfo>();
2280 auto timeslice = relayer.getTimesliceForSlot(i);
2281 // We report wether or not this timing info refers to a new Run.
2282 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2283 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2284 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2285 // FIXME: for now there is only one stream, however we
2286 // should calculate this correctly once we finally get the
2287 // the StreamContext in.
2288 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2289 };
2290
2291 // When processing them, timers will have to be cleaned up
2292 // to avoid double counting them.
2293 // This was actually the easiest solution we could find for
2294 // O2-646.
2295 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2296 assert(record.size() == currentSetOfInputs.size());
2297 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2298 // assuming that for timer inputs we do have exactly one PartRef object
2299 // in the MessageSet, multiple PartRef Objects are only possible for either
2300 // split payload messages of wildcard matchers, both for data inputs
2301 DataRef input = record.getByPos(ii);
2302 if (input.spec->lifetime != Lifetime::Timer) {
2303 continue;
2304 }
2305 if (input.header == nullptr) {
2306 continue;
2307 }
2308 // This will hopefully delete the message.
2309 currentSetOfInputs[ii].clear();
2310 }
2311 };
2312
2313 // Function to cleanup record. For the moment we
2314 // simply use it to keep track of input messages
2315 // which are not needed, to display them in the GUI.
2316 auto cleanupRecord = [](InputRecord& record) {
2317 if (O2_LOG_ENABLED(parts) == false) {
2318 return;
2319 }
2320 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2321 DataRef input = record.getByPos(pi);
2322 if (input.header == nullptr) {
2323 continue;
2324 }
2325 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2326 if (sih) {
2327 continue;
2328 }
2329
2330 auto dh = o2::header::get<DataHeader*>(input.header);
2331 if (!dh) {
2332 continue;
2333 }
2334 // We use the address of the first header of a split payload
2335 // to identify the interval.
2336 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2337 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2338
2339 // No split parts, we simply skip the payload
2340 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2341 // this is indicating a sequence of payloads following the header
2342 // FIXME: we will probably also set the DataHeader version
2343 pi += dh->splitPayloadParts - 1;
2344 } else {
2345 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2346 }
2347 }
2348 };
2349
2350 ref.get<DataRelayer>().getReadyToProcess(completed);
2351 if (completed.empty() == true) {
2352 LOGP(debug, "No computations available for dispatching.");
2353 return false;
2354 }
2355
2356 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2357 auto& stats = ref.get<DataProcessingStats>();
2358 auto& states = ref.get<DataProcessingStates>();
2359 std::atomic_thread_fence(std::memory_order_release);
2360 char relayerSlotState[1024];
2361 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2362 char* buffer = relayerSlotState + written;
2363 for (size_t ai = 0; ai != record.size(); ai++) {
2364 buffer[ai] = record.isValid(ai) ? '3' : '0';
2365 }
2366 buffer[record.size()] = 0;
2367 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2368 .size = (int)(record.size() + buffer - relayerSlotState),
2369 .data = relayerSlotState});
2370 uint64_t tEnd = uv_hrtime();
2371 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2372 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2374 // Sum up the total wall time, in milliseconds.
2376 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2378 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2379 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2380 auto latency = calculateInputRecordLatency(record, tStartMilli);
2381 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2382 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2383 static int count = 0;
2385 count++;
2386 };
2387
2388 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2389 auto& states = ref.get<DataProcessingStates>();
2390 std::atomic_thread_fence(std::memory_order_release);
2391 char relayerSlotState[1024];
2392 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2393 char* buffer = strchr(relayerSlotState, ' ') + 1;
2394 for (size_t ai = 0; ai != record.size(); ai++) {
2395 buffer[ai] = record.isValid(ai) ? '2' : '0';
2396 }
2397 buffer[record.size()] = 0;
2398 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2399 };
2400
2401 // This is the main dispatching loop
2402 auto& state = ref.get<DeviceState>();
2403 auto& spec = ref.get<DeviceSpec const>();
2404
2405 auto& dpContext = ref.get<DataProcessorContext>();
2406 auto& streamContext = ref.get<StreamContext>();
2407 O2_SIGNPOST_ID_GENERATE(sid, device);
2408 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2409
2410 auto& stats = ref.get<DataProcessingStats>();
2411 auto& relayer = ref.get<DataRelayer>();
2412 using namespace o2::framework;
2413 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2414 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2415 switch (spec.completionPolicy.order) {
2417 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2418 break;
2420 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2421 break;
2423 default:
2424 break;
2425 }
2426
2427 for (auto action : completed) {
2428 O2_SIGNPOST_ID_GENERATE(aid, device);
2429 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2431 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2432 continue;
2433 }
2434
2435 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2437 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2441 updateRunInformation(TimesliceSlot{action.slot});
2442 }
2443 InputSpan span = getInputSpan(action.slot, shouldConsume);
2444 auto& spec = ref.get<DeviceSpec const>();
2445 InputRecord record{spec.inputs,
2446 span,
2447 *context.registry};
2448 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2449 {
2450 // Notice this should be thread safe and reentrant
2451 // as it is called from many threads.
2452 streamContext.preProcessingCallbacks(processContext);
2453 dpContext.preProcessingCallbacks(processContext);
2454 }
2456 context.postDispatchingCallbacks(processContext);
2457 if (spec.forwards.empty() == false) {
2458 auto& timesliceIndex = ref.get<TimesliceIndex>();
2459 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2460 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2461 continue;
2462 }
2463 }
2464 // If there is no optional inputs we canForwardEarly
2465 // the messages to that parallel processing can happen.
2466 // In this case we pass true to indicate that we want to
2467 // copy the messages to the subsequent data processor.
2468 bool hasForwards = spec.forwards.empty() == false;
2470
2471 if (context.canForwardEarly && hasForwards && consumeSomething) {
2472 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2473 auto& timesliceIndex = ref.get<TimesliceIndex>();
2474 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2475 }
2476 markInputsAsDone(action.slot);
2477
2478 uint64_t tStart = uv_hrtime();
2479 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2480 preUpdateStats(action, record, tStart);
2481
2482 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2483
2484 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2485 auto& state = ref.get<DeviceState>();
2486 auto& spec = ref.get<DeviceSpec const>();
2487 auto& streamContext = ref.get<StreamContext>();
2488 auto& dpContext = ref.get<DataProcessorContext>();
2489 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2490 switch (action.op) {
2495 return true;
2496 break;
2497 default:
2498 return false;
2499 }
2500 };
2501 if (state.quitRequested == false) {
2502 {
2503 // Callbacks from services
2504 dpContext.preProcessingCallbacks(processContext);
2505 streamContext.preProcessingCallbacks(processContext);
2506 dpContext.preProcessingCallbacks(processContext);
2507 // Callbacks from users
2508 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2509 }
2510 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2511 if (context.statefulProcess && shouldProcess(action)) {
2512 // This way, usercode can use the the same processing context to identify
2513 // its signposts and we can map user code to device iterations.
2514 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2515 (context.statefulProcess)(processContext);
2516 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2517 } else if (context.statelessProcess && shouldProcess(action)) {
2518 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2519 (context.statelessProcess)(processContext);
2520 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2521 } else if (context.statelessProcess || context.statefulProcess) {
2522 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2523 } else {
2524 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2526 }
2527 if (shouldProcess(action)) {
2528 auto& timingInfo = ref.get<TimingInfo>();
2529 if (timingInfo.globalRunNumberChanged) {
2530 context.lastRunNumberProcessed = timingInfo.runNumber;
2531 }
2532 }
2533
2534 // Notify the sink we just consumed some timeframe data
2535 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2536 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2537 auto& allocator = ref.get<DataAllocator>();
2538 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2539 }
2540
2541 // Extra callback which allows a service to add extra outputs.
2542 // This is needed e.g. to ensure that injected CCDB outputs are added
2543 // before an end of stream.
2544 {
2545 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2546 dpContext.finaliseOutputsCallbacks(processContext);
2547 streamContext.finaliseOutputsCallbacks(processContext);
2548 }
2549
2550 {
2551 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2552 dpContext.postProcessingCallbacks(processContext);
2553 streamContext.postProcessingCallbacks(processContext);
2554 }
2555 }
2556 };
2557
2558 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2559 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2560 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2561 }
2562 if (noCatch) {
2563 try {
2564 runNoCatch(action);
2565 } catch (o2::framework::RuntimeErrorRef e) {
2566 (context.errorHandling)(e, record);
2567 }
2568 } else {
2569 try {
2570 runNoCatch(action);
2571 } catch (std::exception& ex) {
2575 auto e = runtime_error(ex.what());
2576 (context.errorHandling)(e, record);
2577 } catch (o2::framework::RuntimeErrorRef e) {
2578 (context.errorHandling)(e, record);
2579 }
2580 }
2581 if (state.severityStack.empty() == false) {
2582 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2583 state.severityStack.pop_back();
2584 }
2585
2586 postUpdateStats(action, record, tStart, tStartMilli);
2587 // We forward inputs only when we consume them. If we simply Process them,
2588 // we keep them for next message arriving.
2590 cleanupRecord(record);
2591 context.postDispatchingCallbacks(processContext);
2592 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2593 }
2594 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2595 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2596 auto& timesliceIndex = ref.get<TimesliceIndex>();
2597 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2598 }
2599 context.postForwardingCallbacks(processContext);
2601 cleanTimers(action.slot, record);
2602 }
2603 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2604 }
2605 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2606
2607 // We now broadcast the end of stream if it was requested
2608 if (state.streaming == StreamingState::EndOfStreaming) {
2609 LOGP(detail, "Broadcasting end of stream");
2610 for (auto& channel : spec.outputChannels) {
2612 }
2614 }
2615
2616 return true;
2617}
2618
2620{
2621 LOG(error) << msg;
2622 ServiceRegistryRef ref{mServiceRegistry};
2623 auto& stats = ref.get<DataProcessingStats>();
2625}
2626
2627std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2628{
2629
2630 if (registry.active<ConfigurationInterface>()) {
2631 auto& cfg = registry.get<ConfigurationInterface>();
2632 try {
2633 cfg.getRecursive(name);
2634 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2635 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2636 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2637 configStore->preload();
2638 configStore->activate();
2639 return configStore;
2640 } catch (...) {
2641 // No overrides...
2642 }
2643 }
2644 return {nullptr};
2645}
2646
2647} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:515
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:547
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:602
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:596
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &, ProcessingPolicies &policies)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
auto switchState(ServiceRegistryRef &ref, StreamingState newState) -> void
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:618
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153