Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
31#include "Framework/InputSpan.h"
32#if defined(__APPLE__) || defined(NDEBUG)
33#define O2_SIGNPOST_IMPLEMENTATION
34#endif
35#include "Framework/Signpost.h"
48
49#include "DecongestionService.h"
51#include "DataRelayerHelpers.h"
52#include "Headers/DataHeader.h"
54
55#include <Framework/Tracing.h>
56
57#include <fairmq/Parts.h>
58#include <fairmq/Socket.h>
59#include <fairmq/ProgOptions.h>
60#if __has_include(<fairmq/shmem/Message.h>)
61#include <fairmq/shmem/Message.h>
62#endif
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// Special log to keep track of the lifetime of the parts
91// Stream which keeps track of the calibration lifetime logic
93// Special log to track the async queue behavior
95// Special log to track the forwarding requests
97
98using namespace o2::framework;
99using ConfigurationInterface = o2::configuration::ConfigurationInterface;
101
102constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
103
104namespace o2::framework
105{
106
107template <>
111
115{
116 auto* state = (DeviceState*)handle->data;
117 state->loopReason |= DeviceState::TIMER_EXPIRED;
118}
119
120bool hasOnlyTimers(DeviceSpec const& spec)
121{
122 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
123}
124
126{
127 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
128}
129
131{
132 auto* ref = (ServiceRegistryRef*)handle->data;
133 auto& state = ref->get<DeviceState>();
134 state.loopReason |= DeviceState::TIMER_EXPIRED;
135 // Check if this is a source device
136 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
137 auto& spec = ref->get<DeviceSpec const>();
138 if (hasOnlyGenerated(spec)) {
139 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
140 } else {
141 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.",
142 state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration");
143 }
144 state.transitionHandling = TransitionHandlingState::Expired;
145}
146
148{
149 auto& state = ref.get<DeviceState>();
150 auto& context = ref.get<DataProcessorContext>();
151 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
152 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
153 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
154 state.streaming = newState;
155 ref.get<ControlService>().notifyStreamingState(state.streaming);
156};
157
159{
160 auto* ref = (ServiceRegistryRef*)handle->data;
161 auto& state = ref->get<DeviceState>();
162 auto& spec = ref->get<DeviceSpec const>();
163 state.loopReason |= DeviceState::TIMER_EXPIRED;
164
165 // Check if this is a source device
166 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
167
168 if (hasOnlyGenerated(spec)) {
169 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
171 } else {
172 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
173 state.allowedProcessing = DeviceState::CalibrationOnly;
174 }
175}
176
178{
179 auto* state = (DeviceState*)s->data;
181}
182
183DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
184{
185 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
186 return devices[running.index];
187}
188
194
196 : mRunningDevice{running},
197 mConfigRegistry{nullptr},
198 mServiceRegistry{registry},
199 mProcessingPolicies{policies}
200{
201 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
202 if (key == "cleanup") {
204 auto& deviceState = ref.get<DeviceState>();
205 int64_t cleanupCount = deviceState.cleanupCount.load();
206 int64_t newCleanupCount = std::stoll(value);
207 if (newCleanupCount <= cleanupCount) {
208 return;
209 }
210 deviceState.cleanupCount.store(newCleanupCount);
211 for (auto& info : deviceState.inputChannelInfos) {
212 fair::mq::Parts parts;
213 while (info.channel->Receive(parts, 0)) {
214 LOGP(debug, "Dropping {} parts", parts.Size());
215 if (parts.Size() == 0) {
216 break;
217 }
218 }
219 }
220 }
221 });
222
223 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
225 auto& deviceState = ref.get<DeviceState>();
226 auto& control = ref.get<ControlService>();
227 auto& callbacks = ref.get<CallbackService>();
228 control.notifyDeviceState(fair::mq::GetStateName(state));
230
231 if (deviceState.nextFairMQState.empty() == false) {
232 auto state = deviceState.nextFairMQState.back();
233 (void)this->ChangeState(state);
234 deviceState.nextFairMQState.pop_back();
235 }
236 };
237
238 // 99 is to execute DPL callbacks last
239 this->SubscribeToStateChange("99-dpl", stateWatcher);
240
241 // One task for now.
242 mStreams.resize(1);
243 mHandles.resize(1);
244
245 ServiceRegistryRef ref{mServiceRegistry};
246 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
247 auto& state = ref.get<DeviceState>();
248 assert(state.loop);
249 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
250 mAwakeHandle->data = &state;
251 if (res < 0) {
252 LOG(error) << "Unable to initialise subscription";
253 }
254
256 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
257 int res = uv_async_send(wakeHandle);
258 if (res < 0) {
259 LOG(error) << "Unable to notify subscription";
260 }
261 LOG(debug) << "State transition requested";
262 });
263}
264
265// Callback to execute the processing. Notice how the data is
266// is a vector of DataProcessorContext so that we can index the correct
267// one with the thread id. For the moment we simply use the first one.
268void run_callback(uv_work_t* handle)
269{
270 auto* task = (TaskStreamInfo*)handle->data;
271 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
272 // We create a new signpost interval for this specific data processor. Same id, same data processor.
273 auto& dataProcessorContext = ref.get<DataProcessorContext>();
274 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
275 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
278 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
279}
280
281// Once the processing in a thread is done, this is executed on the main thread.
282void run_completion(uv_work_t* handle, int status)
283{
284 auto* task = (TaskStreamInfo*)handle->data;
285 // Notice that the completion, while running on the main thread, still
286 // has a salt which is associated to the actual stream which was doing the computation
287 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
288 auto& state = ref.get<DeviceState>();
289 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
290
291 using o2::monitoring::Metric;
292 using o2::monitoring::Monitoring;
293 using o2::monitoring::tags::Key;
294 using o2::monitoring::tags::Value;
295
296 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
297 auto& dpStats = ref.get<DataProcessingStats>();
298 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
299
300 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
301 dpStats.processCommandQueue();
302 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
303 };
304
305 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
306 auto& dpStats = ref.get<DataProcessingStats>();
307 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
308 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
309 dpStats.processCommandQueue();
310 };
311
312 for (auto& consumer : state.offerConsumers) {
313 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
314 }
315 state.offerConsumers.clear();
316 quotaEvaluator.handleExpired(reportExpiredOffer);
317 quotaEvaluator.dispose(task->id.index);
318 task->running = false;
319}
320
321// Context for polling
323 enum struct PollerState : char { Stopped,
325 Connected,
326 Suspended };
327 char const* name = nullptr;
328 uv_loop_t* loop = nullptr;
330 DeviceState* state = nullptr;
331 fair::mq::Socket* socket = nullptr;
333 int fd = -1;
334 bool read = true;
336};
337
338void on_socket_polled(uv_poll_t* poller, int status, int events)
339{
340 auto* context = (PollerContext*)poller->data;
341 assert(context);
342 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
343 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
344 switch (events) {
345 case UV_READABLE: {
346 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
347 context->state->loopReason |= DeviceState::DATA_INCOMING;
348 } break;
349 case UV_WRITABLE: {
350 O2_SIGNPOST_END(device, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
351 if (context->read) {
352 O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
353 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
354 context->state->loopReason |= DeviceState::DATA_CONNECTED;
355 } else {
356 O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
357 context->state->loopReason |= DeviceState::DATA_OUTGOING;
358 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
359 // just wait for the disconnect.
360 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
361 }
362 context->pollerState = PollerContext::PollerState::Connected;
363 } break;
364 case UV_DISCONNECT: {
365 O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
366 } break;
367 case UV_PRIORITIZED: {
368 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
369 } break;
370 }
371 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
372}
373
374void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
375{
376 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
377 auto* context = (PollerContext*)poller->data;
378 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
379 if (status < 0) {
380 LOGP(fatal, "Error while polling {}: {}", context->name, status);
381 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
382 }
383 switch (events) {
384 case UV_READABLE: {
385 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
386 context->state->loopReason |= DeviceState::DATA_INCOMING;
387 assert(context->channelInfo);
388 context->channelInfo->readPolled = true;
389 } break;
390 case UV_WRITABLE: {
391 O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
392 if (context->read) {
393 O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
394 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
395 } else {
396 O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
397 context->state->loopReason |= DeviceState::DATA_OUTGOING;
398 }
399 } break;
400 case UV_DISCONNECT: {
401 O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
402 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
403 } break;
404 case UV_PRIORITIZED: {
405 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
406 } break;
407 }
408 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
409}
410
419{
420 auto ref = ServiceRegistryRef{mServiceRegistry};
421 auto& context = ref.get<DataProcessorContext>();
422 auto& spec = getRunningDevice(mRunningDevice, ref);
423
424 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
425 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
426 context.statelessProcess = spec.algorithm.onProcess;
427 context.statefulProcess = nullptr;
428 context.error = spec.algorithm.onError;
429 context.initError = spec.algorithm.onInitError;
430
431 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
432 if (configStore == nullptr) {
433 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
434 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
435 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
436 configStore->preload();
437 configStore->activate();
438 }
439
440 using boost::property_tree::ptree;
441
443 for (auto& entry : configStore->store()) {
444 std::stringstream ss;
445 std::string str;
446 if (entry.second.empty() == false) {
447 boost::property_tree::json_parser::write_json(ss, entry.second, false);
448 str = ss.str();
449 str.pop_back(); // remove EoL
450 } else {
451 str = entry.second.get_value<std::string>();
452 }
453 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
454 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
455 }
456
457 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
458
459 // Setup the error handlers for init
460 if (context.initError) {
461 context.initErrorHandling = [&errorCallback = context.initError,
462 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
466 auto& context = ref.get<DataProcessorContext>();
467 auto& err = error_from_ref(e);
468 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
469 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
470 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
471 auto& stats = ref.get<DataProcessingStats>();
473 InitErrorContext errorContext{ref, e};
474 errorCallback(errorContext);
475 };
476 } else {
477 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
478 auto& err = error_from_ref(e);
482 auto& context = ref.get<DataProcessorContext>();
483 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
484 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
485 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
486 auto& stats = ref.get<DataProcessingStats>();
488 exit(1);
489 };
490 }
491
492 context.expirationHandlers.clear();
493 context.init = spec.algorithm.onInit;
494 if (context.init) {
495 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
496 InitContext initContext{*mConfigRegistry, mServiceRegistry};
497
498 if (noCatch) {
499 try {
500 context.statefulProcess = context.init(initContext);
502 if (context.initErrorHandling) {
503 (context.initErrorHandling)(e);
504 }
505 }
506 } else {
507 try {
508 context.statefulProcess = context.init(initContext);
509 } catch (std::exception& ex) {
513 auto e = runtime_error(ex.what());
514 (context.initErrorHandling)(e);
516 (context.initErrorHandling)(e);
517 }
518 }
519 }
520 auto& state = ref.get<DeviceState>();
521 state.inputChannelInfos.resize(spec.inputChannels.size());
525 int validChannelId = 0;
526 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
527 auto& name = spec.inputChannels[ci].name;
528 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
529 state.inputChannelInfos[ci].state = InputChannelState::Pull;
530 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
531 validChannelId++;
532 } else {
533 state.inputChannelInfos[ci].id = {validChannelId++};
534 }
535 }
536
537 // Invoke the callback policy for this device.
538 if (spec.callbacksPolicy.policy != nullptr) {
539 InitContext initContext{*mConfigRegistry, mServiceRegistry};
540 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
541 }
542
543 // Services which are stream should be initialised now
544 auto* options = GetConfig();
545 for (size_t si = 0; si < mStreams.size(); ++si) {
547 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
548 }
549 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
550}
551
552void on_signal_callback(uv_signal_t* handle, int signum)
553{
554 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
555 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
556
557 auto* registry = (ServiceRegistry*)handle->data;
558 if (!registry) {
559 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
560 return;
561 }
562 ServiceRegistryRef ref{*registry};
563 auto& state = ref.get<DeviceState>();
564 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
565 auto& stats = ref.get<DataProcessingStats>();
567 size_t ri = 0;
568 while (ri != quotaEvaluator.mOffers.size()) {
569 auto& offer = quotaEvaluator.mOffers[ri];
570 // We were already offered some sharedMemory, so we
571 // do not consider the offer.
572 // FIXME: in principle this should account for memory
573 // available and being offered, however we
574 // want to get out of the woods for now.
575 if (offer.valid && offer.sharedMemory != 0) {
576 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
577 return;
578 }
579 ri++;
580 }
581 // Find the first empty offer and have 1GB of shared memory there
582 for (auto& offer : quotaEvaluator.mOffers) {
583 if (offer.valid == false) {
584 offer.cpu = 0;
585 offer.memory = 0;
586 offer.sharedMemory = 1000000000;
587 offer.valid = true;
588 offer.user = -1;
589 break;
590 }
591 }
593 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
594}
595
596static auto toBeForwardedHeader = [](void* header) -> bool {
597 // If is now possible that the record is not complete when
598 // we forward it, because of a custom completion policy.
599 // this means that we need to skip the empty entries in the
600 // record for being forwarded.
601 if (header == nullptr) {
602 return false;
603 }
604 auto sih = o2::header::get<SourceInfoHeader*>(header);
605 if (sih) {
606 return false;
607 }
608
609 auto dih = o2::header::get<DomainInfoHeader*>(header);
610 if (dih) {
611 return false;
612 }
613
614 auto dh = o2::header::get<DataHeader*>(header);
615 if (!dh) {
616 return false;
617 }
618 auto dph = o2::header::get<DataProcessingHeader*>(header);
619 if (!dph) {
620 return false;
621 }
622 return true;
623};
624
625static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
626 FairMQDeviceProxy& proxy,
627 std::unique_ptr<fair::mq::Message>& header,
628 std::unique_ptr<fair::mq::Message>& payload,
629 size_t total,
630 bool consume) {
631 if (header.get() == nullptr) {
632 // Missing an header is not an error anymore.
633 // it simply means that we did not receive the
634 // given input, but we were asked to
635 // consume existing, so we skip it.
636 return false;
637 }
638 if (payload.get() == nullptr && consume == true) {
639 // If the payload is not there, it means we already
640 // processed it with ConsumeExisiting. Therefore we
641 // need to do something only if this is the last consume.
642 header.reset(nullptr);
643 return false;
644 }
645
646 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
647 if (fdph == nullptr) {
648 LOG(error) << "Data is missing DataProcessingHeader";
649 return false;
650 }
651 auto fdh = o2::header::get<DataHeader*>(header->GetData());
652 if (fdh == nullptr) {
653 LOG(error) << "Data is missing DataHeader";
654 return false;
655 }
656
657 // We need to find the forward route only for the first
658 // part of a split payload. All the others will use the same.
659 // but always check if we have a sequence of multiple payloads
660 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
661 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
662 }
663 return cachedForwardingChoices.empty() == false;
664};
665
666struct DecongestionContext {
669};
670
671auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
672 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
673 auto& ref = task.user<DecongestionContext>().ref;
674
675 auto& decongestion = ref.get<DecongestionService>();
676 auto& proxy = ref.get<FairMQDeviceProxy>();
677 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
678 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
679 return;
680 }
681 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
682 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
683 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
684 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
685 // TODO: this we could cache in the proxy at the bind moment.
686 if (info.channelType != ChannelAccountingType::DPL) {
687 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
688 info.name.c_str());
689
690 continue;
691 }
692 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
693 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
694 info.name.c_str(), oldestTimeslice.timeslice.value);
695 }
696 }
697};
698
699// This is how we do the forwarding, i.e. we push
700// the inputs which are shared between this device and others
701// to the next one in the daisy chain.
702// FIXME: do it in a smarter way than O(N^2)
703static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
704 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
705 auto& proxy = registry.get<FairMQDeviceProxy>();
706 // we collect all messages per forward in a map and send them together
707 std::vector<fair::mq::Parts> forwardedParts;
708 forwardedParts.resize(proxy.getNumForwards());
709 std::vector<ChannelIndex> cachedForwardingChoices{};
710 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
711 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
712 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
713
714 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
715 auto& messageSet = currentSetOfInputs[ii];
716 // In case the messageSet is empty, there is nothing to be done.
717 if (messageSet.size() == 0) {
718 continue;
719 }
720 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
721 continue;
722 }
723 cachedForwardingChoices.clear();
724
725 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
726 auto& messageSet = currentSetOfInputs[ii];
727 auto& header = messageSet.header(pi);
728 auto& payload = messageSet.payload(pi);
729 auto total = messageSet.getNumberOfPayloads(pi);
730
731 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
732 continue;
733 }
734
735 // In case of more than one forward route, we need to copy the message.
736 // This will eventually use the same mamory if running with the same backend.
737 if (cachedForwardingChoices.size() > 1) {
738 copy = true;
739 }
740 auto* dh = o2::header::get<DataHeader*>(header->GetData());
741 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
742
743 if (copy) {
744 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
745 auto&& newHeader = header->GetTransport()->CreateMessage();
746 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
747 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
748 newHeader->Copy(*header);
749 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
750
751 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
752 auto&& newPayload = header->GetTransport()->CreateMessage();
753 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
754 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
755 }
756 }
757 } else {
758 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
759 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
760 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
761 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
762 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
763 }
764 }
765 }
766 }
767 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
768 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
769 if (forwardedParts[fi].Size() == 0) {
770 continue;
771 }
772 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
773 auto& parts = forwardedParts[fi];
774 if (info.policy == nullptr) {
775 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
776 continue;
777 }
778 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
779 info.policy->forward(parts, ChannelIndex{fi}, registry);
780 }
781
782 auto& asyncQueue = registry.get<AsyncQueue>();
783 auto& decongestion = registry.get<DecongestionService>();
784 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
785 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
786 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
787 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
788 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
789};
790
791extern volatile int region_read_global_dummy_variable;
793
795void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
796{
797 if (infos.empty() == false) {
798 std::vector<fair::mq::RegionInfo> toBeNotified;
799 toBeNotified.swap(infos); // avoid any MT issue.
800 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
801 for (auto const& info : toBeNotified) {
802 if (dummyRead) {
803 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
804 region_read_global_dummy_variable = ((int*)info.ptr)[i];
805 }
806 }
807 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
808 }
809 }
810}
811
812namespace
813{
815{
816 auto* state = (DeviceState*)handle->data;
818}
819} // namespace
820
821void DataProcessingDevice::initPollers()
822{
823 auto ref = ServiceRegistryRef{mServiceRegistry};
824 auto& deviceContext = ref.get<DeviceContext>();
825 auto& context = ref.get<DataProcessorContext>();
826 auto& spec = ref.get<DeviceSpec const>();
827 auto& state = ref.get<DeviceState>();
828 // We add a timer only in case a channel poller is not there.
829 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
830 for (auto& [channelName, channel] : GetChannels()) {
831 InputChannelInfo* channelInfo;
832 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
833 auto& channelSpec = spec.inputChannels[ci];
834 channelInfo = &state.inputChannelInfos[ci];
835 if (channelSpec.name != channelName) {
836 continue;
837 }
838 channelInfo->channel = &this->GetChannel(channelName, 0);
839 break;
840 }
841 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
842 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
843 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
844 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
845 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
846 continue;
847 }
848 // We only watch receiving sockets.
849 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
850 LOGP(detail, "{} is to send data. Not polling.", channelName);
851 continue;
852 }
853
854 if (channelName.rfind("from_", 0) != 0) {
855 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
856 continue;
857 }
858
859 // We assume there is always a ZeroMQ socket behind.
860 int zmq_fd = 0;
861 size_t zmq_fd_len = sizeof(zmq_fd);
862 // FIXME: I should probably save those somewhere... ;-)
863 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
864 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
865 if (zmq_fd == 0) {
866 LOG(error) << "Cannot get file descriptor for channel." << channelName;
867 continue;
868 }
869 LOGP(detail, "Polling socket for {}", channelName);
870 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
871 pCtx->name = strdup(channelName.c_str());
872 pCtx->loop = state.loop;
873 pCtx->device = this;
874 pCtx->state = &state;
875 pCtx->fd = zmq_fd;
876 assert(channelInfo != nullptr);
877 pCtx->channelInfo = channelInfo;
878 pCtx->socket = &channel[0].GetSocket();
879 pCtx->read = true;
880 poller->data = pCtx;
881 uv_poll_init(state.loop, poller, zmq_fd);
882 if (channelName.rfind("from_", 0) != 0) {
883 LOGP(detail, "{} is an out of band channel.", channelName);
884 state.activeOutOfBandPollers.push_back(poller);
885 } else {
886 channelInfo->pollerIndex = state.activeInputPollers.size();
887 state.activeInputPollers.push_back(poller);
888 }
889 }
890 // In case we do not have any input channel and we do not have
891 // any timers or signal watchers we still wake up whenever we can send data to downstream
892 // devices to allow for enumerations.
893 if (state.activeInputPollers.empty() &&
894 state.activeOutOfBandPollers.empty() &&
895 state.activeTimers.empty() &&
896 state.activeSignals.empty()) {
897 // FIXME: this is to make sure we do not reset the output timer
898 // for readout proxies or similar. In principle this should go once
899 // we move to OutOfBand InputSpec.
900 if (state.inputChannelInfos.empty()) {
901 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
902 deviceContext.exitTransitionTimeout = 0;
903 }
904 for (auto& [channelName, channel] : GetChannels()) {
905 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
906 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
907 continue;
908 }
909 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
910 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
911 continue;
912 }
913 // We assume there is always a ZeroMQ socket behind.
914 int zmq_fd = 0;
915 size_t zmq_fd_len = sizeof(zmq_fd);
916 // FIXME: I should probably save those somewhere... ;-)
917 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
918 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
919 if (zmq_fd == 0) {
920 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
921 continue;
922 }
923 LOG(detail) << "Polling socket for " << channel[0].GetName();
924 // FIXME: leak
925 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
926 pCtx->name = strdup(channelName.c_str());
927 pCtx->loop = state.loop;
928 pCtx->device = this;
929 pCtx->state = &state;
930 pCtx->fd = zmq_fd;
931 pCtx->read = false;
932 poller->data = pCtx;
933 uv_poll_init(state.loop, poller, zmq_fd);
934 state.activeOutputPollers.push_back(poller);
935 }
936 }
937 } else {
938 LOGP(detail, "This is a fake device so we exit after the first iteration.");
939 deviceContext.exitTransitionTimeout = 0;
940 // This is a fake device, so we can request to exit immediately
941 ServiceRegistryRef ref{mServiceRegistry};
942 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
943 // A two second timer to stop internal devices which do not want to
944 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
945 uv_timer_init(state.loop, timer);
946 timer->data = &state;
947 uv_update_time(state.loop);
948 uv_timer_start(timer, on_idle_timer, 2000, 2000);
949 state.activeTimers.push_back(timer);
950 }
951}
952
953void DataProcessingDevice::startPollers()
954{
955 auto ref = ServiceRegistryRef{mServiceRegistry};
956 auto& deviceContext = ref.get<DeviceContext>();
957 auto& state = ref.get<DeviceState>();
958
959 for (auto* poller : state.activeInputPollers) {
960 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
961 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
962 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
963 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
964 }
965 for (auto& poller : state.activeOutOfBandPollers) {
966 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
967 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
968 }
969 for (auto* poller : state.activeOutputPollers) {
970 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
971 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
972 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
973 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
974 }
975
976 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
977 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
978 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
979
980 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
981 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
982 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
983}
984
985void DataProcessingDevice::stopPollers()
986{
987 auto ref = ServiceRegistryRef{mServiceRegistry};
988 auto& deviceContext = ref.get<DeviceContext>();
989 auto& state = ref.get<DeviceState>();
990 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
991 for (auto* poller : state.activeInputPollers) {
992 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
993 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
994 uv_poll_stop(poller);
995 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
996 }
997 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
998 for (auto* poller : state.activeOutOfBandPollers) {
999 uv_poll_stop(poller);
1000 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1001 }
1002 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
1003 for (auto* poller : state.activeOutputPollers) {
1004 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
1005 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
1006 uv_poll_stop(poller);
1007 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1008 }
1009
1010 uv_timer_stop(deviceContext.gracePeriodTimer);
1011 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
1012 free(deviceContext.gracePeriodTimer);
1013 deviceContext.gracePeriodTimer = nullptr;
1014
1015 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1016 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1017 free(deviceContext.dataProcessingGracePeriodTimer);
1018 deviceContext.dataProcessingGracePeriodTimer = nullptr;
1019}
1020
1022{
1023 auto ref = ServiceRegistryRef{mServiceRegistry};
1024 auto& deviceContext = ref.get<DeviceContext>();
1025 auto& context = ref.get<DataProcessorContext>();
1026
1027 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1028 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
1029 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
1030 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
1031 auto& state = ref.get<DeviceState>();
1032 int i = 0;
1033 for (auto& di : distinct) {
1034 auto& route = spec.inputs[di];
1035 if (route.configurator.has_value() == false) {
1036 i++;
1037 continue;
1038 }
1039 ExpirationHandler handler{
1040 .name = route.configurator->name,
1041 .routeIndex = RouteIndex{i++},
1042 .lifetime = route.matcher.lifetime,
1043 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1044 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1045 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1046 context.expirationHandlers.emplace_back(std::move(handler));
1047 }
1048
1049 if (state.awakeMainThread == nullptr) {
1050 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1051 state.awakeMainThread->data = &state;
1052 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1053 }
1054
1055 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1056 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1057 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1058
1059 for (auto& channel : GetChannels()) {
1060 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1061 &registry = mServiceRegistry,
1062 &pendingRegionInfos = mPendingRegionInfos,
1063 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1064 std::lock_guard<std::mutex> lock(regionInfoMutex);
1065 LOG(detail) << ">>> Region info event" << info.event;
1066 LOG(detail) << "id: " << info.id;
1067 LOG(detail) << "ptr: " << info.ptr;
1068 LOG(detail) << "size: " << info.size;
1069 LOG(detail) << "flags: " << info.flags;
1070 // Now we check for pending events with the mutex,
1071 // so the lines below are atomic.
1072 pendingRegionInfos.push_back(info);
1073 context.expectedRegionCallbacks -= 1;
1074 // We always want to handle these on the main loop,
1075 // so we awake it.
1076 ServiceRegistryRef ref{registry};
1077 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1078 });
1079 }
1080
1081 // Add a signal manager for SIGUSR1 so that we can force
1082 // an event from the outside, making sure that the event loop can
1083 // be unblocked (e.g. by a quitting DPL driver) even when there
1084 // is no data pending to be processed.
1085 if (deviceContext.sigusr1Handle == nullptr) {
1086 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1087 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1088 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1089 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1090 }
1091 // If there is any signal, we want to make sure they are active
1092 for (auto& handle : state.activeSignals) {
1093 handle->data = &state;
1094 }
1095 // When we start, we must make sure that we do listen to the signal
1096 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1097
1099 DataProcessingDevice::initPollers();
1100
1101 // Whenever we InitTask, we consider as if the previous iteration
1102 // was successful, so that even if there is no timer or receiving
1103 // channel, we can still start an enumeration.
1104 DataProcessorContext* initialContext = nullptr;
1105 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1106 if (!idle) {
1107 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1108 }
1109
1110 // We should be ready to run here. Therefore we copy all the
1111 // required parts in the DataProcessorContext. Eventually we should
1112 // do so on a per thread basis, with fine grained locks.
1113 // FIXME: this should not use ServiceRegistry::threadSalt, but
1114 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1115 // N is the number of the multiplexed data processor.
1116 // We will get there.
1117 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1118
1119 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1120
1121 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1122 std::lock_guard<std::mutex> lock(mutex);
1123 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1124 };
1125 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1130 while (hasPendingEvents(deviceContext)) {
1131 // Wait for the callback to signal its done, so that we do not busy wait.
1132 uv_run(state.loop, UV_RUN_ONCE);
1133 // Handle callbacks if any
1134 {
1135 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1136 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1137 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1138 }
1139 }
1140 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1141}
1142
1144{
1145 context.isSink = false;
1146 // If nothing is a sink, the rate limiting simply does not trigger.
1147 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1148
1149 auto ref = ServiceRegistryRef{mServiceRegistry};
1150 auto& spec = ref.get<DeviceSpec const>();
1151
1152 // The policy is now allowed to state the default.
1153 context.balancingInputs = spec.completionPolicy.balanceChannels;
1154 // This is needed because the internal injected dummy sink should not
1155 // try to balance inputs unless the rate limiting is requested.
1156 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1157 context.balancingInputs = false;
1158 }
1159 if (enableRateLimiting) {
1160 for (auto& spec : spec.outputs) {
1161 if (spec.matcher.binding.value == "dpl-summary") {
1162 context.isSink = true;
1163 break;
1164 }
1165 }
1166 }
1167
1168 context.registry = &mServiceRegistry;
1171 if (context.error != nullptr) {
1172 context.errorHandling = [&errorCallback = context.error,
1173 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1177 auto& err = error_from_ref(e);
1178 auto& context = ref.get<DataProcessorContext>();
1179 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1180 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1181 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1182 auto& stats = ref.get<DataProcessingStats>();
1184 ErrorContext errorContext{record, ref, e};
1185 errorCallback(errorContext);
1186 };
1187 } else {
1188 context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
1189 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1190 auto& err = error_from_ref(e);
1194 auto& context = ref.get<DataProcessorContext>();
1195 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1196 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1197 auto& stats = ref.get<DataProcessingStats>();
1199 switch (errorPolicy) {
1201 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1202 throw e;
1203 default:
1204 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1205 break;
1206 }
1207 };
1208 }
1209
1210 auto decideEarlyForward = [&context, &spec, this]() -> bool {
1213 bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1214 bool onlyConditions = true;
1215 bool overriddenEarlyForward = false;
1216 for (auto& forwarded : spec.forwards) {
1217 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1218 onlyConditions = false;
1219 }
1220#if !__has_include(<fairmq/shmem/Message.h>)
1221 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1222 context.canForwardEarly = false;
1223 overriddenEarlyForward = true;
1224 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1225 break;
1226 }
1227#endif
1228 if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1229 context.canForwardEarly = false;
1230 overriddenEarlyForward = true;
1231 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1232 break;
1233 }
1234 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1235 context.canForwardEarly = false;
1236 overriddenEarlyForward = true;
1237 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1238 break;
1239 }
1240 }
1241 if (!overriddenEarlyForward && onlyConditions) {
1242 context.canForwardEarly = true;
1243 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1244 }
1245 return canForwardEarly;
1246 };
1247 context.canForwardEarly = decideEarlyForward();
1248}
1249
1251{
1252 auto ref = ServiceRegistryRef{mServiceRegistry};
1253 auto& state = ref.get<DeviceState>();
1254
1255 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1256 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1257 state.quitRequested = false;
1259 state.allowedProcessing = DeviceState::Any;
1260 for (auto& info : state.inputChannelInfos) {
1261 if (info.state != InputChannelState::Pull) {
1262 info.state = InputChannelState::Running;
1263 }
1264 }
1265
1266 // Catch callbacks which fail before we start.
1267 // Notice that when running multiple dataprocessors
1268 // we should probably allow expendable ones to fail.
1269 try {
1270 auto& dpContext = ref.get<DataProcessorContext>();
1271 dpContext.preStartCallbacks(ref);
1272 for (size_t i = 0; i < mStreams.size(); ++i) {
1273 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1274 auto& context = streamRef.get<StreamContext>();
1275 context.preStartStreamCallbacks(streamRef);
1276 }
1277 } catch (std::exception& e) {
1278 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1279 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1280 throw;
1281 } catch (o2::framework::RuntimeErrorRef& e) {
1282 auto& err = error_from_ref(e);
1283 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1284 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1285 throw;
1286 } catch (...) {
1287 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1288 throw;
1289 }
1290
1291 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1292 startPollers();
1293
1294 // Raise to 1 when we are ready to start processing
1295 using o2::monitoring::Metric;
1296 using o2::monitoring::Monitoring;
1297 using o2::monitoring::tags::Key;
1298 using o2::monitoring::tags::Value;
1299
1300 auto& monitoring = ref.get<Monitoring>();
1301 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1302 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1303}
1304
1306{
1307 ServiceRegistryRef ref{mServiceRegistry};
1308 // Raise to 1 when we are ready to start processing
1309 using o2::monitoring::Metric;
1310 using o2::monitoring::Monitoring;
1311 using o2::monitoring::tags::Key;
1312 using o2::monitoring::tags::Value;
1313
1314 auto& monitoring = ref.get<Monitoring>();
1315 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1316
1317 stopPollers();
1318 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1319 auto& dpContext = ref.get<DataProcessorContext>();
1320 dpContext.postStopCallbacks(ref);
1321}
1322
1324{
1325 ServiceRegistryRef ref{mServiceRegistry};
1326 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1327}
1328
1330{
1331 ServiceRegistryRef ref{mServiceRegistry};
1332 auto& state = ref.get<DeviceState>();
1334 bool firstLoop = true;
1335 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1336 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1337
1338 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1339 if (dplEnableMultithreding) {
1340 setenv("UV_THREADPOOL_SIZE", "1", 1);
1341 }
1342
1343 while (state.transitionHandling != TransitionHandlingState::Expired) {
1344 if (state.nextFairMQState.empty() == false) {
1345 (void)this->ChangeState(state.nextFairMQState.back());
1346 state.nextFairMQState.pop_back();
1347 }
1348 // Notify on the main thread the new region callbacks, making sure
1349 // no callback is issued if there is something still processing.
1350 {
1351 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1352 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1353 }
1354 // This will block for the correct delay (or until we get data
1355 // on a socket). We also do not block on the first iteration
1356 // so that devices which do not have a timer can still start an
1357 // enumeration.
1358 {
1359 ServiceRegistryRef ref{mServiceRegistry};
1360 ref.get<DriverClient>().flushPending(mServiceRegistry);
1361 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1362 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1363 // In such case we will take care of it at next iteration.
1364 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1365
1366 auto shouldNotWait = (lastActive != nullptr &&
1367 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1369 if (firstLoop) {
1370 shouldNotWait = true;
1371 firstLoop = false;
1372 }
1373 if (lastActive != nullptr) {
1375 }
1376 if (NewStatePending()) {
1377 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1378 shouldNotWait = true;
1380 }
1381 if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1382 state.transitionHandling = TransitionHandlingState::Requested;
1383 auto& deviceContext = ref.get<DeviceContext>();
1384 // Check if we only have timers
1385 auto& spec = ref.get<DeviceSpec const>();
1386 if (hasOnlyTimers(spec)) {
1388 }
1389
1390 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1391 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1392 uv_update_time(state.loop);
1393 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1394 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1395 }
1396 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1397 state.transitionHandling = TransitionHandlingState::Requested;
1398 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1399 uv_update_time(state.loop);
1400 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1401 deviceContext.exitTransitionTimeout);
1402 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1403 bool onlyGenerated = hasOnlyGenerated(spec);
1404 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1405 if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1406 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1407 } else {
1408 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1409 "New state requested. Waiting for %d seconds before %{public}s",
1410 timeout,
1411 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1412 }
1413 } else {
1414 state.transitionHandling = TransitionHandlingState::Expired;
1415 if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1416 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1417 } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1418 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1419 } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1420 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1421 } else {
1422 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1423 }
1424 }
1425 }
1426 // If we are Idle, we can then consider the transition to be expired.
1427 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1428 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1429 state.transitionHandling = TransitionHandlingState::Expired;
1430 }
1431 if (state.severityStack.empty() == false) {
1432 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1433 state.severityStack.pop_back();
1434 }
1435 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1436 // shouldNotWait |= info.readPolled;
1437 // }
1438 state.loopReason = DeviceState::NO_REASON;
1439 state.firedTimers.clear();
1440 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1441 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1442 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1443 }
1444 // Run the asynchronous queue just before sleeping again, so that:
1445 // - we can trigger further events from the queue
1446 // - we can guarantee this is the last thing we do in the loop (
1447 // assuming no one else is adding to the queue before this point).
1448 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1449 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1450 ServiceRegistryRef ref{registry};
1451 ref.get<AsyncQueue>();
1452 ref.get<DecongestionService>();
1453 ref.get<DataRelayer>();
1454 // Get the current timeslice for the slot.
1455 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1457 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1458 };
1459 auto& relayer = ref.get<DataRelayer>();
1460 relayer.prunePending(onDrop);
1461 auto& queue = ref.get<AsyncQueue>();
1462 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1463 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1464 if (shouldNotWait == false) {
1465 auto& dpContext = ref.get<DataProcessorContext>();
1466 dpContext.preLoopCallbacks(ref);
1467 }
1468 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1469 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1470 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1471 if ((state.loopReason & state.tracingFlags) != 0) {
1472 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1473 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1474 } else if (state.severityStack.empty() == false) {
1475 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1476 state.severityStack.pop_back();
1477 }
1478 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1479
1480 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1481 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1482 relayer.rescan();
1483 }
1484
1485 if (!state.pendingOffers.empty()) {
1486 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1487 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1488 }
1489 }
1490
1491 // Notify on the main thread the new region callbacks, making sure
1492 // no callback is issued if there is something still processing.
1493 // Notice that we still need to perform callbacks also after
1494 // the socket epolled, because otherwise we would end up serving
1495 // the callback after the first data arrives is the system is too
1496 // fast to transition from Init to Run.
1497 {
1498 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1499 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1500 }
1501
1502 assert(mStreams.size() == mHandles.size());
1504 TaskStreamRef streamRef{-1};
1505 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1506 auto& taskInfo = mStreams[ti];
1507 if (taskInfo.running) {
1508 continue;
1509 }
1510 // Stream 0 is for when we run in
1511 streamRef.index = ti;
1512 }
1513 using o2::monitoring::Metric;
1514 using o2::monitoring::Monitoring;
1515 using o2::monitoring::tags::Key;
1516 using o2::monitoring::tags::Value;
1517 // We have an empty stream, let's check if we have enough
1518 // resources for it to run something
1519 if (streamRef.index != -1) {
1520 // Synchronous execution of the callbacks. This will be moved in the
1521 // moved in the on_socket_polled once we have threading in place.
1522 uv_work_t& handle = mHandles[streamRef.index];
1523 TaskStreamInfo& stream = mStreams[streamRef.index];
1524 handle.data = &mStreams[streamRef.index];
1525
1526 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1527 ServiceRegistryRef ref{registry};
1528 auto& dpStats = ref.get<DataProcessingStats>();
1529 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1530 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1531 dpStats.processCommandQueue();
1532 };
1533 auto ref = ServiceRegistryRef{mServiceRegistry};
1534
1535 // Deciding wether to run or not can be done by passing a request to
1536 // the evaluator. In this case, the request is always satisfied and
1537 // we run on whatever resource is available.
1538 auto& spec = ref.get<DeviceSpec const>();
1539 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1540
1541 if (enough) {
1542 stream.id = streamRef;
1543 stream.running = true;
1544 stream.registry = &mServiceRegistry;
1545 if (dplEnableMultithreding) [[unlikely]] {
1546 stream.task = &handle;
1547 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1548 } else {
1549 run_callback(&handle);
1550 run_completion(&handle, 0);
1551 }
1552 } else {
1553 auto ref = ServiceRegistryRef{mServiceRegistry};
1554 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1555 }
1556 }
1557 }
1558
1559 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1560 auto& spec = ref.get<DeviceSpec const>();
1562 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1563 auto& info = state.inputChannelInfos[ci];
1564 info.parts.fParts.clear();
1565 }
1566 state.transitionHandling = TransitionHandlingState::NoTransition;
1567}
1568
1572{
1573 auto& context = ref.get<DataProcessorContext>();
1574 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1575 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1576
1577 {
1578 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1579 }
1580 // Whether or not we had something to do.
1581
1582 // Initialise the value for context.allDone. It will possibly be updated
1583 // below if any of the channels is not done.
1584 //
1585 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1586 // expect to receive an EndOfStream signal. Thus we do not wait for these
1587 // to be completed. In the case of data source devices, as they do not have
1588 // real data input channels, they have to signal EndOfStream themselves.
1589 auto& state = ref.get<DeviceState>();
1590 auto& spec = ref.get<DeviceSpec const>();
1591 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1592 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1593 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1594 if (info.channel) {
1595 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1596 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1597 } else {
1598 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1599 }
1600 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1601 });
1602 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1603 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1606 static std::vector<int> pollOrder;
1607 pollOrder.resize(state.inputChannelInfos.size());
1608 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1609 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1610 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1611 });
1612
1613 // Nothing to poll...
1614 if (pollOrder.empty()) {
1615 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1616 return;
1617 }
1618 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1619 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1620 auto delta = currentNewest.value - currentOldest.value;
1621 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1622 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1623 auto& infos = state.inputChannelInfos;
1624
1625 if (context.balancingInputs) {
1626 static int pipelineLength = DefaultsHelpers::pipelineLength();
1627 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1628 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1629 return infos[a].oldestForChannel.value > limitNew;
1630 });
1631 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1632 const auto& channelInfo = state.inputChannelInfos[*it];
1633 if (channelInfo.pollerIndex != -1) {
1634 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1635 auto& pollerContext = *(PollerContext*)(poller->data);
1636 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1637 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1638 bool shouldBeRunning = it < newEnd;
1639 if (running != shouldBeRunning) {
1640 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1641 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1642 }
1643 }
1644 }
1645 }
1646 pollOrder.erase(newEnd, pollOrder.end());
1647 }
1648 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1649
1650 for (auto sci : pollOrder) {
1651 auto& info = state.inputChannelInfos[sci];
1652 auto& channelSpec = spec.inputChannels[sci];
1653 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1654 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1655
1656 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1657 context.allDone = false;
1658 }
1659 if (info.state != InputChannelState::Running) {
1660 // Remember to flush data if we are not running
1661 // and there is some message pending.
1662 if (info.parts.Size()) {
1664 }
1665 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1666 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1667 continue;
1668 }
1669 if (info.channel == nullptr) {
1670 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1671 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1672 continue;
1673 }
1674 // Only poll DPL channels for now.
1676 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1677 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1678 continue;
1679 }
1680 auto& socket = info.channel->GetSocket();
1681 // If we have pending events from a previous iteration,
1682 // we do receive in any case.
1683 // Otherwise we check if there is any pending event and skip
1684 // this channel in case there is none.
1685 if (info.hasPendingEvents == 0) {
1686 socket.Events(&info.hasPendingEvents);
1687 // If we do not read, we can continue.
1688 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1689 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1690 continue;
1691 }
1692 }
1693 // We can reset this, because it means we have seen at least 1
1694 // message after the UV_READABLE was raised.
1695 info.readPolled = false;
1696 // Notice that there seems to be a difference between the documentation
1697 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1698 // is raised does not mean that a message is immediately available to
1699 // read, just that it will be available soon, so the receive can
1700 // still return -2. To avoid this we keep receiving on the socket until
1701 // we get a message. In order not to overflow the DPL queue we process
1702 // one message at the time and we keep track of wether there were more
1703 // to process.
1704 bool newMessages = false;
1705 while (true) {
1706 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1707 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1708 if (info.parts.Size() < 64) {
1709 fair::mq::Parts parts;
1710 info.channel->Receive(parts, 0);
1711 if (parts.Size()) {
1712 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1713 }
1714 for (auto&& part : parts) {
1715 info.parts.fParts.emplace_back(std::move(part));
1716 }
1717 newMessages |= true;
1718 }
1719
1720 if (info.parts.Size() >= 0) {
1722 // Receiving data counts as activity now, so that
1723 // We can make sure we process all the pending
1724 // messages without hanging on the uv_run.
1725 break;
1726 }
1727 }
1728 // We check once again for pending events, keeping track if this was the
1729 // case so that we can immediately repeat this loop and avoid remaining
1730 // stuck in uv_run. This is because we will not get notified on the socket
1731 // if more events are pending due to zeromq level triggered approach.
1732 socket.Events(&info.hasPendingEvents);
1733 if (info.hasPendingEvents) {
1734 info.readPolled = false;
1735 // In case there were messages, we consider it as activity
1736 if (newMessages) {
1737 state.lastActiveDataProcessor.store(&context);
1738 }
1739 }
1740 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1741 channelSpec.name.c_str(), info.id.value);
1742 }
1743}
1744
1746{
1747 auto& context = ref.get<DataProcessorContext>();
1748 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1749 auto& state = ref.get<DeviceState>();
1750 auto& spec = ref.get<DeviceSpec const>();
1751
1752 if (state.streaming == StreamingState::Idle) {
1753 return;
1754 }
1755
1756 context.completed.clear();
1757 context.completed.reserve(16);
1758 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1759 state.lastActiveDataProcessor.store(&context);
1760 }
1761 DanglingContext danglingContext{*context.registry};
1762
1763 context.preDanglingCallbacks(danglingContext);
1764 if (state.lastActiveDataProcessor.load() == nullptr) {
1765 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1766 }
1767 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1768 if (activity.expiredSlots > 0) {
1769 state.lastActiveDataProcessor = &context;
1770 }
1771
1772 context.completed.clear();
1773 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1774 state.lastActiveDataProcessor = &context;
1775 }
1776
1777 context.postDanglingCallbacks(danglingContext);
1778
1779 // If we got notified that all the sources are done, we call the EndOfStream
1780 // callback and return false. Notice that what happens next is actually
1781 // dependent on the callback, not something which is controlled by the
1782 // framework itself.
1783 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1785 state.lastActiveDataProcessor = &context;
1786 }
1787
1788 if (state.streaming == StreamingState::EndOfStreaming) {
1789 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1790 // We keep processing data until we are Idle.
1791 // FIXME: not sure this is the correct way to drain the queues, but
1792 // I guess we will see.
1795 auto& relayer = ref.get<DataRelayer>();
1796
1797 bool shouldProcess = hasOnlyGenerated(spec) == false;
1798
1799 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1800 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1801 }
1802
1803 auto& timingInfo = ref.get<TimingInfo>();
1804 // We should keep the data generated at end of stream only for those
1805 // which are not sources.
1806 timingInfo.keepAtEndOfStream = shouldProcess;
1807 // Fill timinginfo with some reasonable values for data sent with endOfStream
1808 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1809 timingInfo.tfCounter = -1;
1810 timingInfo.firstTForbit = -1;
1811 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1812 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1813 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1814
1815 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1816
1817 context.preEOSCallbacks(eosContext);
1818 auto& streamContext = ref.get<StreamContext>();
1819 streamContext.preEOSCallbacks(eosContext);
1820 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1821 streamContext.postEOSCallbacks(eosContext);
1822 context.postEOSCallbacks(eosContext);
1823
1824 for (auto& channel : spec.outputChannels) {
1825 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1827 }
1828 // This is needed because the transport is deleted before the device.
1829 relayer.clear();
1831 // In case we should process, note the data processor responsible for it
1832 if (shouldProcess) {
1833 state.lastActiveDataProcessor = &context;
1834 }
1835 // On end of stream we shut down all output pollers.
1836 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1837 for (auto& poller : state.activeOutputPollers) {
1838 uv_poll_stop(poller);
1839 }
1840 return;
1841 }
1842
1843 if (state.streaming == StreamingState::Idle) {
1844 // On end of stream we shut down all output pollers.
1845 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1846 for (auto& poller : state.activeOutputPollers) {
1847 uv_poll_stop(poller);
1848 }
1849 }
1850
1851 return;
1852}
1853
1855{
1856 ServiceRegistryRef ref{mServiceRegistry};
1857 ref.get<DataRelayer>().clear();
1858 auto& deviceContext = ref.get<DeviceContext>();
1859 // If the signal handler is there, we should
1860 // hide the registry from it, so that we do not
1861 // end up calling the signal handler on something
1862 // which might not be there anymore.
1863 if (deviceContext.sigusr1Handle) {
1864 deviceContext.sigusr1Handle->data = nullptr;
1865 }
1866 // Makes sure we do not have a working context on
1867 // shutdown.
1868 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1869 handle->data = nullptr;
1870 }
1871}
1872
1875 {
1876 }
1877};
1878
1884{
1885 using InputInfo = DataRelayer::InputInfo;
1887
1888 auto& context = ref.get<DataProcessorContext>();
1889 // This is the same id as the upper level function, so we get the events
1890 // associated with the same interval. We will simply use "handle_data" as
1891 // the category.
1892 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1893
1894 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1895 // and we do a few stats. We bind parts as a lambda captured variable, rather
1896 // than an input, because we do not want the outer loop actually be exposed
1897 // to the implementation details of the messaging layer.
1898 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1899 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1900 auto ref = ServiceRegistryRef{*context.registry};
1901 auto& stats = ref.get<DataProcessingStats>();
1902 auto& state = ref.get<DeviceState>();
1903 auto& parts = info.parts;
1904 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1905
1906 std::vector<InputInfo> results;
1907 // we can reserve the upper limit
1908 results.reserve(parts.Size() / 2);
1909 size_t nTotalPayloads = 0;
1910
1911 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1912 results.emplace_back(position, length, type, index);
1913 if (type != InputType::Invalid && length > 1) {
1914 nTotalPayloads += length - 1;
1915 }
1916 };
1917
1918 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1919 auto* headerData = parts.At(pi)->GetData();
1920 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1921 if (sih) {
1922 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1923 info.state = sih->state;
1924 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1925 state.lastActiveDataProcessor = &context;
1926 continue;
1927 }
1928 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1929 if (dih) {
1930 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1931 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1932 state.lastActiveDataProcessor = &context;
1933 continue;
1934 }
1935 auto dh = o2::header::get<DataHeader*>(headerData);
1936 if (!dh) {
1937 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1938 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1939 continue;
1940 }
1941 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1942 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1943 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1944 continue;
1945 }
1946 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1947 // We only deal with the tracking of parts if the log is enabled.
1948 // This is because in principle we should track the size of each of
1949 // the parts and sum it up. Not for now.
1950 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1951 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1952 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1953 if (!dph) {
1954 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1955 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1956 continue;
1957 }
1958 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1959 // this is indicating a sequence of payloads following the header
1960 // FIXME: we will probably also set the DataHeader version
1961 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1962 pi += dh->splitPayloadParts - 1;
1963 } else {
1964 // We can set the type for the next splitPayloadParts
1965 // because we are guaranteed they are all the same.
1966 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1967 // pair.
1968 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1969 if (finalSplitPayloadIndex > parts.Size()) {
1970 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1971 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1972 continue;
1973 }
1974 insertInputInfo(pi, 2, InputType::Data, info.id);
1975 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1976 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1977 }
1978 }
1979 }
1980 if (results.size() + nTotalPayloads != parts.Size()) {
1981 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1982 return std::nullopt;
1983 }
1984 return results;
1985 };
1986
1987 auto reportError = [ref](const char* message) {
1988 auto& stats = ref.get<DataProcessingStats>();
1990 };
1991
1992 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1993 auto& relayer = ref.get<DataRelayer>();
1994 auto& state = ref.get<DeviceState>();
1995 static WaitBackpressurePolicy policy;
1996 auto& parts = info.parts;
1997 // We relay execution to make sure we have a complete set of parts
1998 // available.
1999 bool hasBackpressure = false;
2000 size_t minBackpressureTimeslice = -1;
2001 bool hasData = false;
2002 size_t oldestPossibleTimeslice = -1;
2003 static std::vector<int> ordering;
2004 // Same as inputInfos but with iota.
2005 ordering.resize(inputInfos.size());
2006 std::iota(ordering.begin(), ordering.end(), 0);
2007 // stable sort orderings by type and position
2008 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
2009 auto const& ai = inputInfos[a];
2010 auto const& bi = inputInfos[b];
2011 if (ai.type != bi.type) {
2012 return ai.type < bi.type;
2013 }
2014 return ai.position < bi.position;
2015 });
2016 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
2017 auto const& input = inputInfos[ordering[ii]];
2018 switch (input.type) {
2019 case InputType::Data: {
2020 hasData = true;
2021 auto headerIndex = input.position;
2022 auto nMessages = 0;
2023 auto nPayloadsPerHeader = 0;
2024 if (input.size > 2) {
2025 // header and multiple payload sequence
2026 nMessages = input.size;
2027 nPayloadsPerHeader = nMessages - 1;
2028 } else {
2029 // multiple header-payload pairs
2030 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2031 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2032 nPayloadsPerHeader = 1;
2033 ii += (nMessages / 2) - 1;
2034 }
2035 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
2036 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
2037 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2038 slot.index, oldestOutputInfo.timeslice.value);
2039 ref.get<AsyncQueue>();
2040 ref.get<DecongestionService>();
2041 ref.get<DataRelayer>();
2042 // Get the current timeslice for the slot.
2043 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
2045 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
2046 };
2047 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2048 &parts.At(headerIndex),
2049 input,
2050 nMessages,
2051 nPayloadsPerHeader,
2052 onDrop);
2053 switch (relayed.type) {
2055 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
2056 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
2057 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2058 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2059 info.backpressureNotified = true;
2060 info.normalOpsNotified = false;
2061 }
2062 policy.backpressure(info);
2063 hasBackpressure = true;
2064 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2065 break;
2069 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2070 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2071 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2072 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2073 info.normalOpsNotified = true;
2074 info.backpressureNotified = false;
2075 }
2076 break;
2077 }
2078 } break;
2079 case InputType::SourceInfo: {
2080 LOGP(detail, "Received SourceInfo");
2081 auto& context = ref.get<DataProcessorContext>();
2082 state.lastActiveDataProcessor = &context;
2083 auto headerIndex = input.position;
2084 auto payloadIndex = input.position + 1;
2085 assert(payloadIndex < parts.Size());
2086 // FIXME: the message with the end of stream cannot contain
2087 // split parts.
2088 parts.At(headerIndex).reset(nullptr);
2089 parts.At(payloadIndex).reset(nullptr);
2090 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2091 // parts.At(headerIndex + 1 + i).reset(nullptr);
2092 // }
2093 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2094
2095 } break;
2096 case InputType::DomainInfo: {
2099 auto& context = ref.get<DataProcessorContext>();
2100 state.lastActiveDataProcessor = &context;
2101 auto headerIndex = input.position;
2102 auto payloadIndex = input.position + 1;
2103 assert(payloadIndex < parts.Size());
2104 // FIXME: the message with the end of stream cannot contain
2105 // split parts.
2106
2107 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2108 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2109 break;
2110 }
2111 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2112 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2113 parts.At(headerIndex).reset(nullptr);
2114 parts.At(payloadIndex).reset(nullptr);
2115 } break;
2116 case InputType::Invalid: {
2117 reportError("Invalid part found.");
2118 } break;
2119 }
2120 }
2123 if (oldestPossibleTimeslice != (size_t)-1) {
2124 info.oldestForChannel = {oldestPossibleTimeslice};
2125 auto& context = ref.get<DataProcessorContext>();
2126 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2127 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2128 state.lastActiveDataProcessor = &context;
2129 }
2130 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2131 parts.fParts.erase(it, parts.end());
2132 if (parts.fParts.size()) {
2133 LOG(debug) << parts.fParts.size() << " messages backpressured";
2134 }
2135 };
2136
2137 // Second part. This is the actual outer loop we want to obtain, with
2138 // implementation details which can be read. Notice how most of the state
2139 // is actually hidden. For example we do not expose what "input" is. This
2140 // will allow us to keep the same toplevel logic even if the actual meaning
2141 // of input is changed (for example we might move away from multipart
2142 // messages). Notice also that we need to act diffently depending on the
2143 // actual CompletionOp we want to perform. In particular forwarding inputs
2144 // also gets rid of them from the cache.
2145 auto inputTypes = getInputTypes();
2146 if (bool(inputTypes) == false) {
2147 reportError("Parts should come in couples. Dropping it.");
2148 return;
2149 }
2150 handleValidMessages(*inputTypes);
2151 return;
2152}
2153
2154namespace
2155{
2156struct InputLatency {
2157 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2158 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2159};
2160
2161auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2162{
2163 InputLatency result;
2164
2165 for (auto& item : record) {
2166 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2167 if (header == nullptr) {
2168 continue;
2169 }
2170 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2171 if (partLatency < 0) {
2172 partLatency = 0;
2173 }
2174 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2175 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2176 }
2177 return result;
2178};
2179
2180auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2181{
2182 size_t totalInputSize = 0;
2183 for (auto& item : record) {
2184 auto* header = o2::header::get<DataHeader*>(item.header);
2185 if (header == nullptr) {
2186 continue;
2187 }
2188 totalInputSize += header->payloadSize;
2189 }
2190 return totalInputSize;
2191};
2192
2193template <typename T>
2194void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2195{
2196 T prev_value = maximum_value;
2197 while (prev_value < value &&
2198 !maximum_value.compare_exchange_weak(prev_value, value)) {
2199 }
2200}
2201} // namespace
2202
2203bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2204{
2205 auto& context = ref.get<DataProcessorContext>();
2206 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2207 // This is the actual hidden state for the outer loop. In case we decide we
2208 // want to support multithreaded dispatching of operations, I can simply
2209 // move these to some thread local store and the rest of the lambdas
2210 // should work just fine.
2211 std::vector<MessageSet> currentSetOfInputs;
2212
2213 //
2214 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2215 auto& relayer = ref.get<DataRelayer>();
2216 if (consume) {
2217 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2218 } else {
2219 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2220 }
2221 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2222 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2223 const char* headerptr = nullptr;
2224 const char* payloadptr = nullptr;
2225 size_t payloadSize = 0;
2226 // - each input can have multiple parts
2227 // - "part" denotes a sequence of messages belonging together, the first message of the
2228 // sequence is the header message
2229 // - each part has one or more payload messages
2230 // - InputRecord provides all payloads as header-payload pair
2231 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2232 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2233 headerptr = static_cast<char const*>(headerMsg->GetData());
2234 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2235 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2236 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2237 }
2238 return DataRef{};
2239 };
2240 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2241 return currentSetOfInputs[i].getNumberOfPairs();
2242 };
2243#if __has_include(<fairmq/shmem/Message.h>)
2244 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2245 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2246 return header.GetRefCount();
2247 };
2248#else
2249 std::function<int(size_t)> refCountGetter = nullptr;
2250#endif
2251 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2252 };
2253
2254 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2255 auto& relayer = ref.get<DataRelayer>();
2257 };
2258
2259 // I need a preparation step which gets the current timeslice id and
2260 // propagates it to the various contextes (i.e. the actual entities which
2261 // create messages) because the messages need to have the timeslice id into
2262 // it.
2263 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2264 auto& relayer = ref.get<DataRelayer>();
2265 auto& timingInfo = ref.get<TimingInfo>();
2266 auto timeslice = relayer.getTimesliceForSlot(i);
2267
2268 timingInfo.timeslice = timeslice.value;
2269 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2270 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2271 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2272 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2273 };
2274 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2275 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2276 auto& relayer = ref.get<DataRelayer>();
2277 auto& timingInfo = ref.get<TimingInfo>();
2278 auto timeslice = relayer.getTimesliceForSlot(i);
2279 // We report wether or not this timing info refers to a new Run.
2280 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2281 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2282 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2283 // FIXME: for now there is only one stream, however we
2284 // should calculate this correctly once we finally get the
2285 // the StreamContext in.
2286 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2287 };
2288
2289 // When processing them, timers will have to be cleaned up
2290 // to avoid double counting them.
2291 // This was actually the easiest solution we could find for
2292 // O2-646.
2293 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2294 assert(record.size() == currentSetOfInputs.size());
2295 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2296 // assuming that for timer inputs we do have exactly one PartRef object
2297 // in the MessageSet, multiple PartRef Objects are only possible for either
2298 // split payload messages of wildcard matchers, both for data inputs
2299 DataRef input = record.getByPos(ii);
2300 if (input.spec->lifetime != Lifetime::Timer) {
2301 continue;
2302 }
2303 if (input.header == nullptr) {
2304 continue;
2305 }
2306 // This will hopefully delete the message.
2307 currentSetOfInputs[ii].clear();
2308 }
2309 };
2310
2311 // Function to cleanup record. For the moment we
2312 // simply use it to keep track of input messages
2313 // which are not needed, to display them in the GUI.
2314 auto cleanupRecord = [](InputRecord& record) {
2315 if (O2_LOG_ENABLED(parts) == false) {
2316 return;
2317 }
2318 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2319 DataRef input = record.getByPos(pi);
2320 if (input.header == nullptr) {
2321 continue;
2322 }
2323 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2324 if (sih) {
2325 continue;
2326 }
2327
2328 auto dh = o2::header::get<DataHeader*>(input.header);
2329 if (!dh) {
2330 continue;
2331 }
2332 // We use the address of the first header of a split payload
2333 // to identify the interval.
2334 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2335 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2336
2337 // No split parts, we simply skip the payload
2338 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2339 // this is indicating a sequence of payloads following the header
2340 // FIXME: we will probably also set the DataHeader version
2341 pi += dh->splitPayloadParts - 1;
2342 } else {
2343 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2344 }
2345 }
2346 };
2347
2348 ref.get<DataRelayer>().getReadyToProcess(completed);
2349 if (completed.empty() == true) {
2350 LOGP(debug, "No computations available for dispatching.");
2351 return false;
2352 }
2353
2354 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2355 auto& stats = ref.get<DataProcessingStats>();
2356 auto& states = ref.get<DataProcessingStates>();
2357 std::atomic_thread_fence(std::memory_order_release);
2358 char relayerSlotState[1024];
2359 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2360 char* buffer = relayerSlotState + written;
2361 for (size_t ai = 0; ai != record.size(); ai++) {
2362 buffer[ai] = record.isValid(ai) ? '3' : '0';
2363 }
2364 buffer[record.size()] = 0;
2365 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2366 .size = (int)(record.size() + buffer - relayerSlotState),
2367 .data = relayerSlotState});
2368 uint64_t tEnd = uv_hrtime();
2369 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2370 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2372 // Sum up the total wall time, in milliseconds.
2374 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2376 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2377 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2378 auto latency = calculateInputRecordLatency(record, tStartMilli);
2379 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2380 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2381 static int count = 0;
2383 count++;
2384 };
2385
2386 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2387 auto& states = ref.get<DataProcessingStates>();
2388 std::atomic_thread_fence(std::memory_order_release);
2389 char relayerSlotState[1024];
2390 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2391 char* buffer = strchr(relayerSlotState, ' ') + 1;
2392 for (size_t ai = 0; ai != record.size(); ai++) {
2393 buffer[ai] = record.isValid(ai) ? '2' : '0';
2394 }
2395 buffer[record.size()] = 0;
2396 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2397 };
2398
2399 // This is the main dispatching loop
2400 auto& state = ref.get<DeviceState>();
2401 auto& spec = ref.get<DeviceSpec const>();
2402
2403 auto& dpContext = ref.get<DataProcessorContext>();
2404 auto& streamContext = ref.get<StreamContext>();
2405 O2_SIGNPOST_ID_GENERATE(sid, device);
2406 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2407
2408 auto& stats = ref.get<DataProcessingStats>();
2409 auto& relayer = ref.get<DataRelayer>();
2410 using namespace o2::framework;
2411 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2412 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2413 switch (spec.completionPolicy.order) {
2415 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2416 break;
2418 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2419 break;
2421 default:
2422 break;
2423 }
2424
2425 for (auto action : completed) {
2426 O2_SIGNPOST_ID_GENERATE(aid, device);
2427 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2429 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2430 continue;
2431 }
2432
2433 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2435 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2439 updateRunInformation(TimesliceSlot{action.slot});
2440 }
2441 InputSpan span = getInputSpan(action.slot, shouldConsume);
2442 auto& spec = ref.get<DeviceSpec const>();
2443 InputRecord record{spec.inputs,
2444 span,
2445 *context.registry};
2446 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2447 {
2448 // Notice this should be thread safe and reentrant
2449 // as it is called from many threads.
2450 streamContext.preProcessingCallbacks(processContext);
2451 dpContext.preProcessingCallbacks(processContext);
2452 }
2454 context.postDispatchingCallbacks(processContext);
2455 if (spec.forwards.empty() == false) {
2456 auto& timesliceIndex = ref.get<TimesliceIndex>();
2457 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2458 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2459 continue;
2460 }
2461 }
2462 // If there is no optional inputs we canForwardEarly
2463 // the messages to that parallel processing can happen.
2464 // In this case we pass true to indicate that we want to
2465 // copy the messages to the subsequent data processor.
2466 bool hasForwards = spec.forwards.empty() == false;
2468
2469 if (context.canForwardEarly && hasForwards && consumeSomething) {
2470 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2471 auto& timesliceIndex = ref.get<TimesliceIndex>();
2472 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2473 }
2474 markInputsAsDone(action.slot);
2475
2476 uint64_t tStart = uv_hrtime();
2477 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2478 preUpdateStats(action, record, tStart);
2479
2480 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2481
2482 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2483 auto& state = ref.get<DeviceState>();
2484 auto& spec = ref.get<DeviceSpec const>();
2485 auto& streamContext = ref.get<StreamContext>();
2486 auto& dpContext = ref.get<DataProcessorContext>();
2487 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2488 switch (action.op) {
2493 return true;
2494 break;
2495 default:
2496 return false;
2497 }
2498 };
2499 if (state.quitRequested == false) {
2500 {
2501 // Callbacks from services
2502 dpContext.preProcessingCallbacks(processContext);
2503 streamContext.preProcessingCallbacks(processContext);
2504 dpContext.preProcessingCallbacks(processContext);
2505 // Callbacks from users
2506 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2507 }
2508 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2509 if (context.statefulProcess && shouldProcess(action)) {
2510 // This way, usercode can use the the same processing context to identify
2511 // its signposts and we can map user code to device iterations.
2512 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2513 (context.statefulProcess)(processContext);
2514 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2515 } else if (context.statelessProcess && shouldProcess(action)) {
2516 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2517 (context.statelessProcess)(processContext);
2518 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2519 } else if (context.statelessProcess || context.statefulProcess) {
2520 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2521 } else {
2522 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2524 }
2525 if (shouldProcess(action)) {
2526 auto& timingInfo = ref.get<TimingInfo>();
2527 if (timingInfo.globalRunNumberChanged) {
2528 context.lastRunNumberProcessed = timingInfo.runNumber;
2529 }
2530 }
2531
2532 // Notify the sink we just consumed some timeframe data
2533 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2534 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2535 auto& allocator = ref.get<DataAllocator>();
2536 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2537 }
2538
2539 // Extra callback which allows a service to add extra outputs.
2540 // This is needed e.g. to ensure that injected CCDB outputs are added
2541 // before an end of stream.
2542 {
2543 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2544 dpContext.finaliseOutputsCallbacks(processContext);
2545 streamContext.finaliseOutputsCallbacks(processContext);
2546 }
2547
2548 {
2549 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2550 dpContext.postProcessingCallbacks(processContext);
2551 streamContext.postProcessingCallbacks(processContext);
2552 }
2553 }
2554 };
2555
2556 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2557 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2558 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2559 }
2560 if (noCatch) {
2561 try {
2562 runNoCatch(action);
2563 } catch (o2::framework::RuntimeErrorRef e) {
2564 (context.errorHandling)(e, record);
2565 }
2566 } else {
2567 try {
2568 runNoCatch(action);
2569 } catch (std::exception& ex) {
2573 auto e = runtime_error(ex.what());
2574 (context.errorHandling)(e, record);
2575 } catch (o2::framework::RuntimeErrorRef e) {
2576 (context.errorHandling)(e, record);
2577 }
2578 }
2579 if (state.severityStack.empty() == false) {
2580 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2581 state.severityStack.pop_back();
2582 }
2583
2584 postUpdateStats(action, record, tStart, tStartMilli);
2585 // We forward inputs only when we consume them. If we simply Process them,
2586 // we keep them for next message arriving.
2588 cleanupRecord(record);
2589 context.postDispatchingCallbacks(processContext);
2590 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2591 }
2592 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2593 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2594 auto& timesliceIndex = ref.get<TimesliceIndex>();
2595 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2596 }
2597 context.postForwardingCallbacks(processContext);
2599 cleanTimers(action.slot, record);
2600 }
2601 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2602 }
2603 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2604
2605 // We now broadcast the end of stream if it was requested
2606 if (state.streaming == StreamingState::EndOfStreaming) {
2607 LOGP(detail, "Broadcasting end of stream");
2608 for (auto& channel : spec.outputChannels) {
2610 }
2612 }
2613
2614 return true;
2615}
2616
2618{
2619 LOG(error) << msg;
2620 ServiceRegistryRef ref{mServiceRegistry};
2621 auto& stats = ref.get<DataProcessingStats>();
2623}
2624
2625std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2626{
2627
2628 if (registry.active<ConfigurationInterface>()) {
2629 auto& cfg = registry.get<ConfigurationInterface>();
2630 try {
2631 cfg.getRecursive(name);
2632 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2633 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2634 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2635 configStore->preload();
2636 configStore->activate();
2637 return configStore;
2638 } catch (...) {
2639 // No overrides...
2640 }
2641 }
2642 return {nullptr};
2643}
2644
2645} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:515
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:536
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &, ProcessingPolicies &policies)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
auto switchState(ServiceRegistryRef &ref, StreamingState newState) -> void
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:618
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153