Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
31#include "Framework/InputSpan.h"
32#if defined(__APPLE__) || defined(NDEBUG)
33#define O2_SIGNPOST_IMPLEMENTATION
34#endif
35#include "Framework/Signpost.h"
48
49#include "DecongestionService.h"
51#include "DataRelayerHelpers.h"
52#include "Headers/DataHeader.h"
54
55#include <Framework/Tracing.h>
56
57#include <fairmq/Parts.h>
58#include <fairmq/Socket.h>
59#include <fairmq/ProgOptions.h>
60#include <Configuration/ConfigurationInterface.h>
61#include <Configuration/ConfigurationFactory.h>
62#include <Monitoring/Monitoring.h>
63#include <TMessage.h>
64#include <TClonesArray.h>
65
66#include <fmt/ostream.h>
67#include <algorithm>
68#include <vector>
69#include <numeric>
70#include <memory>
71#include <uv.h>
72#include <execinfo.h>
73#include <sstream>
74#include <boost/property_tree/json_parser.hpp>
75
76// Formatter to avoid having to rewrite the ostream operator for the enum
77namespace fmt
78{
79template <>
82} // namespace fmt
83
84// A log to use for general device logging
86// Special log to keep track of the lifetime of the parts
88// Stream which keeps track of the calibration lifetime logic
90// Special log to track the async queue behavior
92// Special log to track the forwarding requests
94
95using namespace o2::framework;
96using ConfigurationInterface = o2::configuration::ConfigurationInterface;
98
99constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
100
101namespace o2::framework
102{
103
104template <>
108
112{
113 auto* state = (DeviceState*)handle->data;
114 state->loopReason |= DeviceState::TIMER_EXPIRED;
115}
116
117bool hasOnlyTimers(DeviceSpec const& spec)
118{
119 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
120}
121
123{
124 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
125}
126
128{
129 auto* ref = (ServiceRegistryRef*)handle->data;
130 auto& state = ref->get<DeviceState>();
131 state.loopReason |= DeviceState::TIMER_EXPIRED;
132 // Check if this is a source device
133 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
134 auto& spec = ref->get<DeviceSpec const>();
135 if (hasOnlyGenerated(spec)) {
136 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
137 } else {
138 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.",
139 state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration");
140 }
141 state.transitionHandling = TransitionHandlingState::Expired;
142}
143
145{
146 auto* ref = (ServiceRegistryRef*)handle->data;
147 auto& state = ref->get<DeviceState>();
148 state.loopReason |= DeviceState::TIMER_EXPIRED;
149
150 // Check if this is a source device
151 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
152
153 // Source devices should never end up in this callback, since the exitTransitionTimeout should
154 // be reset to the dataProcessingTimeout and the timers cohalesced.
155 assert(hasOnlyGenerated(ref->get<DeviceSpec const>()) == false);
156 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
157 state.allowedProcessing = DeviceState::CalibrationOnly;
158}
159
161{
162 auto* state = (DeviceState*)s->data;
164}
165
166DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
167{
168 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
169 return devices[running.index];
170}
171
177
179 : mRunningDevice{running},
180 mConfigRegistry{nullptr},
181 mServiceRegistry{registry},
182 mProcessingPolicies{policies}
183{
184 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
185 if (key == "cleanup") {
187 auto& deviceState = ref.get<DeviceState>();
188 int64_t cleanupCount = deviceState.cleanupCount.load();
189 int64_t newCleanupCount = std::stoll(value);
190 if (newCleanupCount <= cleanupCount) {
191 return;
192 }
193 deviceState.cleanupCount.store(newCleanupCount);
194 for (auto& info : deviceState.inputChannelInfos) {
195 fair::mq::Parts parts;
196 while (info.channel->Receive(parts, 0)) {
197 LOGP(debug, "Dropping {} parts", parts.Size());
198 if (parts.Size() == 0) {
199 break;
200 }
201 }
202 }
203 }
204 });
205
206 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
208 auto& deviceState = ref.get<DeviceState>();
209 auto& control = ref.get<ControlService>();
210 auto& callbacks = ref.get<CallbackService>();
211 control.notifyDeviceState(fair::mq::GetStateName(state));
213
214 if (deviceState.nextFairMQState.empty() == false) {
215 auto state = deviceState.nextFairMQState.back();
216 (void)this->ChangeState(state);
217 deviceState.nextFairMQState.pop_back();
218 }
219 };
220
221 // 99 is to execute DPL callbacks last
222 this->SubscribeToStateChange("99-dpl", stateWatcher);
223
224 // One task for now.
225 mStreams.resize(1);
226 mHandles.resize(1);
227
228 ServiceRegistryRef ref{mServiceRegistry};
229 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
230 auto& state = ref.get<DeviceState>();
231 assert(state.loop);
232 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
233 mAwakeHandle->data = &state;
234 if (res < 0) {
235 LOG(error) << "Unable to initialise subscription";
236 }
237
239 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
240 int res = uv_async_send(wakeHandle);
241 if (res < 0) {
242 LOG(error) << "Unable to notify subscription";
243 }
244 LOG(debug) << "State transition requested";
245 });
246}
247
248// Callback to execute the processing. Notice how the data is
249// is a vector of DataProcessorContext so that we can index the correct
250// one with the thread id. For the moment we simply use the first one.
251void run_callback(uv_work_t* handle)
252{
253 auto* task = (TaskStreamInfo*)handle->data;
254 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
255 // We create a new signpost interval for this specific data processor. Same id, same data processor.
256 auto& dataProcessorContext = ref.get<DataProcessorContext>();
257 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
258 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
261 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
262}
263
264// Once the processing in a thread is done, this is executed on the main thread.
265void run_completion(uv_work_t* handle, int status)
266{
267 auto* task = (TaskStreamInfo*)handle->data;
268 // Notice that the completion, while running on the main thread, still
269 // has a salt which is associated to the actual stream which was doing the computation
270 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
271 auto& state = ref.get<DeviceState>();
272 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
273
274 using o2::monitoring::Metric;
275 using o2::monitoring::Monitoring;
276 using o2::monitoring::tags::Key;
277 using o2::monitoring::tags::Value;
278
279 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
280 auto& dpStats = ref.get<DataProcessingStats>();
281 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
282
283 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
284 dpStats.processCommandQueue();
285 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
286 };
287
288 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
289 auto& dpStats = ref.get<DataProcessingStats>();
290 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
291 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
292 dpStats.processCommandQueue();
293 };
294
295 for (auto& consumer : state.offerConsumers) {
296 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
297 }
298 state.offerConsumers.clear();
299 quotaEvaluator.handleExpired(reportExpiredOffer);
300 quotaEvaluator.dispose(task->id.index);
301 task->running = false;
302}
303
304// Context for polling
306 enum struct PollerState : char { Stopped,
308 Connected,
309 Suspended };
310 char const* name = nullptr;
311 uv_loop_t* loop = nullptr;
313 DeviceState* state = nullptr;
314 fair::mq::Socket* socket = nullptr;
316 int fd = -1;
317 bool read = true;
319};
320
321void on_socket_polled(uv_poll_t* poller, int status, int events)
322{
323 auto* context = (PollerContext*)poller->data;
324 assert(context);
325 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
326 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
327 switch (events) {
328 case UV_READABLE: {
329 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
330 context->state->loopReason |= DeviceState::DATA_INCOMING;
331 } break;
332 case UV_WRITABLE: {
333 O2_SIGNPOST_END(device, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
334 if (context->read) {
335 O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
336 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
337 context->state->loopReason |= DeviceState::DATA_CONNECTED;
338 } else {
339 O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
340 context->state->loopReason |= DeviceState::DATA_OUTGOING;
341 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
342 // just wait for the disconnect.
343 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
344 }
345 context->pollerState = PollerContext::PollerState::Connected;
346 } break;
347 case UV_DISCONNECT: {
348 O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
349 } break;
350 case UV_PRIORITIZED: {
351 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
352 } break;
353 }
354 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
355}
356
357void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
358{
359 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
360 auto* context = (PollerContext*)poller->data;
361 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
362 if (status < 0) {
363 LOGP(fatal, "Error while polling {}: {}", context->name, status);
364 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
365 }
366 switch (events) {
367 case UV_READABLE: {
368 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
369 context->state->loopReason |= DeviceState::DATA_INCOMING;
370 assert(context->channelInfo);
371 context->channelInfo->readPolled = true;
372 } break;
373 case UV_WRITABLE: {
374 O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
375 if (context->read) {
376 O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
377 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
378 } else {
379 O2_SIGNPOST_START(device, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
380 context->state->loopReason |= DeviceState::DATA_OUTGOING;
381 }
382 } break;
383 case UV_DISCONNECT: {
384 O2_SIGNPOST_END(device, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
385 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
386 } break;
387 case UV_PRIORITIZED: {
388 O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
389 } break;
390 }
391 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
392}
393
402{
403 auto ref = ServiceRegistryRef{mServiceRegistry};
404 auto& context = ref.get<DataProcessorContext>();
405 auto& spec = getRunningDevice(mRunningDevice, ref);
406
407 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
408 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
409 context.statelessProcess = spec.algorithm.onProcess;
410 context.statefulProcess = nullptr;
411 context.error = spec.algorithm.onError;
412 context.initError = spec.algorithm.onInitError;
413
414 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
415 if (configStore == nullptr) {
416 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
417 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
418 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
419 configStore->preload();
420 configStore->activate();
421 }
422
423 using boost::property_tree::ptree;
424
426 for (auto& entry : configStore->store()) {
427 std::stringstream ss;
428 std::string str;
429 if (entry.second.empty() == false) {
430 boost::property_tree::json_parser::write_json(ss, entry.second, false);
431 str = ss.str();
432 str.pop_back(); // remove EoL
433 } else {
434 str = entry.second.get_value<std::string>();
435 }
436 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
437 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
438 }
439
440 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
441
442 // Setup the error handlers for init
443 if (context.initError) {
444 context.initErrorHandling = [&errorCallback = context.initError,
445 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
449 auto& context = ref.get<DataProcessorContext>();
450 auto& err = error_from_ref(e);
451 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
452 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
453 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
454 auto& stats = ref.get<DataProcessingStats>();
456 InitErrorContext errorContext{ref, e};
457 errorCallback(errorContext);
458 };
459 } else {
460 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
461 auto& err = error_from_ref(e);
465 auto& context = ref.get<DataProcessorContext>();
466 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
467 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
468 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
469 auto& stats = ref.get<DataProcessingStats>();
471 exit(1);
472 };
473 }
474
475 context.expirationHandlers.clear();
476 context.init = spec.algorithm.onInit;
477 if (context.init) {
478 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
479 InitContext initContext{*mConfigRegistry, mServiceRegistry};
480
481 if (noCatch) {
482 try {
483 context.statefulProcess = context.init(initContext);
485 if (context.initErrorHandling) {
486 (context.initErrorHandling)(e);
487 }
488 }
489 } else {
490 try {
491 context.statefulProcess = context.init(initContext);
492 } catch (std::exception& ex) {
496 auto e = runtime_error(ex.what());
497 (context.initErrorHandling)(e);
499 (context.initErrorHandling)(e);
500 }
501 }
502 }
503 auto& state = ref.get<DeviceState>();
504 state.inputChannelInfos.resize(spec.inputChannels.size());
508 int validChannelId = 0;
509 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
510 auto& name = spec.inputChannels[ci].name;
511 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
512 state.inputChannelInfos[ci].state = InputChannelState::Pull;
513 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
514 validChannelId++;
515 } else {
516 state.inputChannelInfos[ci].id = {validChannelId++};
517 }
518 }
519
520 // Invoke the callback policy for this device.
521 if (spec.callbacksPolicy.policy != nullptr) {
522 InitContext initContext{*mConfigRegistry, mServiceRegistry};
523 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
524 }
525
526 // Services which are stream should be initialised now
527 auto* options = GetConfig();
528 for (size_t si = 0; si < mStreams.size(); ++si) {
530 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
531 }
532 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
533}
534
535void on_signal_callback(uv_signal_t* handle, int signum)
536{
537 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
538 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
539
540 auto* registry = (ServiceRegistry*)handle->data;
541 if (!registry) {
542 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
543 return;
544 }
545 ServiceRegistryRef ref{*registry};
546 auto& state = ref.get<DeviceState>();
547 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
548 auto& stats = ref.get<DataProcessingStats>();
550 size_t ri = 0;
551 while (ri != quotaEvaluator.mOffers.size()) {
552 auto& offer = quotaEvaluator.mOffers[ri];
553 // We were already offered some sharedMemory, so we
554 // do not consider the offer.
555 // FIXME: in principle this should account for memory
556 // available and being offered, however we
557 // want to get out of the woods for now.
558 if (offer.valid && offer.sharedMemory != 0) {
559 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
560 return;
561 }
562 ri++;
563 }
564 // Find the first empty offer and have 1GB of shared memory there
565 for (auto& offer : quotaEvaluator.mOffers) {
566 if (offer.valid == false) {
567 offer.cpu = 0;
568 offer.memory = 0;
569 offer.sharedMemory = 1000000000;
570 offer.valid = true;
571 offer.user = -1;
572 break;
573 }
574 }
576 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
577}
578
579static auto toBeForwardedHeader = [](void* header) -> bool {
580 // If is now possible that the record is not complete when
581 // we forward it, because of a custom completion policy.
582 // this means that we need to skip the empty entries in the
583 // record for being forwarded.
584 if (header == nullptr) {
585 return false;
586 }
587 auto sih = o2::header::get<SourceInfoHeader*>(header);
588 if (sih) {
589 return false;
590 }
591
592 auto dih = o2::header::get<DomainInfoHeader*>(header);
593 if (dih) {
594 return false;
595 }
596
597 auto dh = o2::header::get<DataHeader*>(header);
598 if (!dh) {
599 return false;
600 }
601 auto dph = o2::header::get<DataProcessingHeader*>(header);
602 if (!dph) {
603 return false;
604 }
605 return true;
606};
607
608static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
609 FairMQDeviceProxy& proxy,
610 std::unique_ptr<fair::mq::Message>& header,
611 std::unique_ptr<fair::mq::Message>& payload,
612 size_t total,
613 bool consume) {
614 if (header.get() == nullptr) {
615 // Missing an header is not an error anymore.
616 // it simply means that we did not receive the
617 // given input, but we were asked to
618 // consume existing, so we skip it.
619 return false;
620 }
621 if (payload.get() == nullptr && consume == true) {
622 // If the payload is not there, it means we already
623 // processed it with ConsumeExisiting. Therefore we
624 // need to do something only if this is the last consume.
625 header.reset(nullptr);
626 return false;
627 }
628
629 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
630 if (fdph == nullptr) {
631 LOG(error) << "Data is missing DataProcessingHeader";
632 return false;
633 }
634 auto fdh = o2::header::get<DataHeader*>(header->GetData());
635 if (fdh == nullptr) {
636 LOG(error) << "Data is missing DataHeader";
637 return false;
638 }
639
640 // We need to find the forward route only for the first
641 // part of a split payload. All the others will use the same.
642 // but always check if we have a sequence of multiple payloads
643 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
644 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
645 }
646 return cachedForwardingChoices.empty() == false;
647};
648
649struct DecongestionContext {
652};
653
654auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
655 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
656 auto& ref = task.user<DecongestionContext>().ref;
657
658 auto& decongestion = ref.get<DecongestionService>();
659 auto& proxy = ref.get<FairMQDeviceProxy>();
660 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
661 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
662 return;
663 }
664 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
665 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
666 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
667 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
668 // TODO: this we could cache in the proxy at the bind moment.
669 if (info.channelType != ChannelAccountingType::DPL) {
670 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
671 info.name.c_str());
672
673 continue;
674 }
675 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
676 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
677 info.name.c_str(), oldestTimeslice.timeslice.value);
678 }
679 }
680};
681
682// This is how we do the forwarding, i.e. we push
683// the inputs which are shared between this device and others
684// to the next one in the daisy chain.
685// FIXME: do it in a smarter way than O(N^2)
686static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
687 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
688 auto& proxy = registry.get<FairMQDeviceProxy>();
689 // we collect all messages per forward in a map and send them together
690 std::vector<fair::mq::Parts> forwardedParts;
691 forwardedParts.resize(proxy.getNumForwards());
692 std::vector<ChannelIndex> cachedForwardingChoices{};
693 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
694 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
695 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
696
697 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
698 auto& messageSet = currentSetOfInputs[ii];
699 // In case the messageSet is empty, there is nothing to be done.
700 if (messageSet.size() == 0) {
701 continue;
702 }
703 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
704 continue;
705 }
706 cachedForwardingChoices.clear();
707
708 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
709 auto& messageSet = currentSetOfInputs[ii];
710 auto& header = messageSet.header(pi);
711 auto& payload = messageSet.payload(pi);
712 auto total = messageSet.getNumberOfPayloads(pi);
713
714 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
715 continue;
716 }
717
718 // In case of more than one forward route, we need to copy the message.
719 // This will eventually use the same mamory if running with the same backend.
720 if (cachedForwardingChoices.size() > 1) {
721 copy = true;
722 }
723 auto* dh = o2::header::get<DataHeader*>(header->GetData());
724 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
725
726 if (copy) {
727 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
728 auto&& newHeader = header->GetTransport()->CreateMessage();
729 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
730 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
731 newHeader->Copy(*header);
732 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
733
734 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
735 auto&& newPayload = header->GetTransport()->CreateMessage();
736 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
737 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
738 }
739 }
740 } else {
741 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
742 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
743 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
744 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
745 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
746 }
747 }
748 }
749 }
750 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
751 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
752 if (forwardedParts[fi].Size() == 0) {
753 continue;
754 }
755 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
756 auto& parts = forwardedParts[fi];
757 if (info.policy == nullptr) {
758 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
759 continue;
760 }
761 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
762 info.policy->forward(parts, ChannelIndex{fi}, registry);
763 }
764
765 auto& asyncQueue = registry.get<AsyncQueue>();
766 auto& decongestion = registry.get<DecongestionService>();
767 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
768 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
769 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
770 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
771 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
772};
773
774extern volatile int region_read_global_dummy_variable;
776
778void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
779{
780 if (infos.empty() == false) {
781 std::vector<fair::mq::RegionInfo> toBeNotified;
782 toBeNotified.swap(infos); // avoid any MT issue.
783 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
784 for (auto const& info : toBeNotified) {
785 if (dummyRead) {
786 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
787 region_read_global_dummy_variable = ((int*)info.ptr)[i];
788 }
789 }
790 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
791 }
792 }
793}
794
795namespace
796{
798{
799 auto* state = (DeviceState*)handle->data;
801}
802} // namespace
803
804void DataProcessingDevice::initPollers()
805{
806 auto ref = ServiceRegistryRef{mServiceRegistry};
807 auto& deviceContext = ref.get<DeviceContext>();
808 auto& context = ref.get<DataProcessorContext>();
809 auto& spec = ref.get<DeviceSpec const>();
810 auto& state = ref.get<DeviceState>();
811 // We add a timer only in case a channel poller is not there.
812 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
813 for (auto& [channelName, channel] : GetChannels()) {
814 InputChannelInfo* channelInfo;
815 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
816 auto& channelSpec = spec.inputChannels[ci];
817 channelInfo = &state.inputChannelInfos[ci];
818 if (channelSpec.name != channelName) {
819 continue;
820 }
821 channelInfo->channel = &this->GetChannel(channelName, 0);
822 break;
823 }
824 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
825 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
826 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
827 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
828 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
829 continue;
830 }
831 // We only watch receiving sockets.
832 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
833 LOGP(detail, "{} is to send data. Not polling.", channelName);
834 continue;
835 }
836
837 if (channelName.rfind("from_", 0) != 0) {
838 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
839 continue;
840 }
841
842 // We assume there is always a ZeroMQ socket behind.
843 int zmq_fd = 0;
844 size_t zmq_fd_len = sizeof(zmq_fd);
845 // FIXME: I should probably save those somewhere... ;-)
846 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
847 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
848 if (zmq_fd == 0) {
849 LOG(error) << "Cannot get file descriptor for channel." << channelName;
850 continue;
851 }
852 LOGP(detail, "Polling socket for {}", channelName);
853 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
854 pCtx->name = strdup(channelName.c_str());
855 pCtx->loop = state.loop;
856 pCtx->device = this;
857 pCtx->state = &state;
858 pCtx->fd = zmq_fd;
859 assert(channelInfo != nullptr);
860 pCtx->channelInfo = channelInfo;
861 pCtx->socket = &channel[0].GetSocket();
862 pCtx->read = true;
863 poller->data = pCtx;
864 uv_poll_init(state.loop, poller, zmq_fd);
865 if (channelName.rfind("from_", 0) != 0) {
866 LOGP(detail, "{} is an out of band channel.", channelName);
867 state.activeOutOfBandPollers.push_back(poller);
868 } else {
869 channelInfo->pollerIndex = state.activeInputPollers.size();
870 state.activeInputPollers.push_back(poller);
871 }
872 }
873 // In case we do not have any input channel and we do not have
874 // any timers or signal watchers we still wake up whenever we can send data to downstream
875 // devices to allow for enumerations.
876 if (state.activeInputPollers.empty() &&
877 state.activeOutOfBandPollers.empty() &&
878 state.activeTimers.empty() &&
879 state.activeSignals.empty()) {
880 // FIXME: this is to make sure we do not reset the output timer
881 // for readout proxies or similar. In principle this should go once
882 // we move to OutOfBand InputSpec.
883 if (state.inputChannelInfos.empty()) {
884 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
885 deviceContext.exitTransitionTimeout = 0;
886 }
887 for (auto& [channelName, channel] : GetChannels()) {
888 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
889 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
890 continue;
891 }
892 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
893 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
894 continue;
895 }
896 // We assume there is always a ZeroMQ socket behind.
897 int zmq_fd = 0;
898 size_t zmq_fd_len = sizeof(zmq_fd);
899 // FIXME: I should probably save those somewhere... ;-)
900 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
901 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
902 if (zmq_fd == 0) {
903 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
904 continue;
905 }
906 LOG(detail) << "Polling socket for " << channel[0].GetName();
907 // FIXME: leak
908 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
909 pCtx->name = strdup(channelName.c_str());
910 pCtx->loop = state.loop;
911 pCtx->device = this;
912 pCtx->state = &state;
913 pCtx->fd = zmq_fd;
914 pCtx->read = false;
915 poller->data = pCtx;
916 uv_poll_init(state.loop, poller, zmq_fd);
917 state.activeOutputPollers.push_back(poller);
918 }
919 }
920 } else {
921 LOGP(detail, "This is a fake device so we exit after the first iteration.");
922 deviceContext.exitTransitionTimeout = 0;
923 // This is a fake device, so we can request to exit immediately
924 ServiceRegistryRef ref{mServiceRegistry};
925 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
926 // A two second timer to stop internal devices which do not want to
927 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
928 uv_timer_init(state.loop, timer);
929 timer->data = &state;
930 uv_update_time(state.loop);
931 uv_timer_start(timer, on_idle_timer, 2000, 2000);
932 state.activeTimers.push_back(timer);
933 }
934}
935
936void DataProcessingDevice::startPollers()
937{
938 auto ref = ServiceRegistryRef{mServiceRegistry};
939 auto& deviceContext = ref.get<DeviceContext>();
940 auto& state = ref.get<DeviceState>();
941
942 for (auto* poller : state.activeInputPollers) {
943 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
944 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
945 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
946 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
947 }
948 for (auto& poller : state.activeOutOfBandPollers) {
949 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
950 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
951 }
952 for (auto* poller : state.activeOutputPollers) {
953 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
954 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
955 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
956 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
957 }
958
959 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
960 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
961 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
962
963 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
964 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
965 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
966}
967
968void DataProcessingDevice::stopPollers()
969{
970 auto ref = ServiceRegistryRef{mServiceRegistry};
971 auto& deviceContext = ref.get<DeviceContext>();
972 auto& state = ref.get<DeviceState>();
973 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
974 for (auto* poller : state.activeInputPollers) {
975 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
976 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
977 uv_poll_stop(poller);
978 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
979 }
980 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
981 for (auto* poller : state.activeOutOfBandPollers) {
982 uv_poll_stop(poller);
983 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
984 }
985 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
986 for (auto* poller : state.activeOutputPollers) {
987 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
988 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
989 uv_poll_stop(poller);
990 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
991 }
992
993 uv_timer_stop(deviceContext.gracePeriodTimer);
994 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
995 free(deviceContext.gracePeriodTimer);
996 deviceContext.gracePeriodTimer = nullptr;
997
998 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
999 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1000 free(deviceContext.dataProcessingGracePeriodTimer);
1001 deviceContext.dataProcessingGracePeriodTimer = nullptr;
1002}
1003
1005{
1006 auto ref = ServiceRegistryRef{mServiceRegistry};
1007 auto& deviceContext = ref.get<DeviceContext>();
1008 auto& context = ref.get<DataProcessorContext>();
1009
1010 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1011 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
1012 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
1013 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
1014 auto& state = ref.get<DeviceState>();
1015 int i = 0;
1016 for (auto& di : distinct) {
1017 auto& route = spec.inputs[di];
1018 if (route.configurator.has_value() == false) {
1019 i++;
1020 continue;
1021 }
1022 ExpirationHandler handler{
1023 .name = route.configurator->name,
1024 .routeIndex = RouteIndex{i++},
1025 .lifetime = route.matcher.lifetime,
1026 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1027 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1028 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1029 context.expirationHandlers.emplace_back(std::move(handler));
1030 }
1031
1032 if (state.awakeMainThread == nullptr) {
1033 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1034 state.awakeMainThread->data = &state;
1035 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1036 }
1037
1038 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1039 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1040 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1041
1042 for (auto& channel : GetChannels()) {
1043 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1044 &registry = mServiceRegistry,
1045 &pendingRegionInfos = mPendingRegionInfos,
1046 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1047 std::lock_guard<std::mutex> lock(regionInfoMutex);
1048 LOG(detail) << ">>> Region info event" << info.event;
1049 LOG(detail) << "id: " << info.id;
1050 LOG(detail) << "ptr: " << info.ptr;
1051 LOG(detail) << "size: " << info.size;
1052 LOG(detail) << "flags: " << info.flags;
1053 // Now we check for pending events with the mutex,
1054 // so the lines below are atomic.
1055 pendingRegionInfos.push_back(info);
1056 context.expectedRegionCallbacks -= 1;
1057 // We always want to handle these on the main loop,
1058 // so we awake it.
1059 ServiceRegistryRef ref{registry};
1060 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1061 });
1062 }
1063
1064 // Add a signal manager for SIGUSR1 so that we can force
1065 // an event from the outside, making sure that the event loop can
1066 // be unblocked (e.g. by a quitting DPL driver) even when there
1067 // is no data pending to be processed.
1068 if (deviceContext.sigusr1Handle == nullptr) {
1069 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1070 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1071 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1072 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1073 }
1074 // If there is any signal, we want to make sure they are active
1075 for (auto& handle : state.activeSignals) {
1076 handle->data = &state;
1077 }
1078 // When we start, we must make sure that we do listen to the signal
1079 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1080
1082 DataProcessingDevice::initPollers();
1083
1084 // Whenever we InitTask, we consider as if the previous iteration
1085 // was successful, so that even if there is no timer or receiving
1086 // channel, we can still start an enumeration.
1087 DataProcessorContext* initialContext = nullptr;
1088 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1089 if (!idle) {
1090 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1091 }
1092
1093 // We should be ready to run here. Therefore we copy all the
1094 // required parts in the DataProcessorContext. Eventually we should
1095 // do so on a per thread basis, with fine grained locks.
1096 // FIXME: this should not use ServiceRegistry::threadSalt, but
1097 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1098 // N is the number of the multiplexed data processor.
1099 // We will get there.
1100 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1101
1102 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1103
1104 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1105 std::lock_guard<std::mutex> lock(mutex);
1106 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1107 };
1108 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1113 while (hasPendingEvents(deviceContext)) {
1114 // Wait for the callback to signal its done, so that we do not busy wait.
1115 uv_run(state.loop, UV_RUN_ONCE);
1116 // Handle callbacks if any
1117 {
1118 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1119 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1120 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1121 }
1122 }
1123 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1124}
1125
1127{
1128 context.isSink = false;
1129 // If nothing is a sink, the rate limiting simply does not trigger.
1130 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1131
1132 auto ref = ServiceRegistryRef{mServiceRegistry};
1133 auto& spec = ref.get<DeviceSpec const>();
1134
1135 // The policy is now allowed to state the default.
1136 context.balancingInputs = spec.completionPolicy.balanceChannels;
1137 // This is needed because the internal injected dummy sink should not
1138 // try to balance inputs unless the rate limiting is requested.
1139 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1140 context.balancingInputs = false;
1141 }
1142 if (enableRateLimiting) {
1143 for (auto& spec : spec.outputs) {
1144 if (spec.matcher.binding.value == "dpl-summary") {
1145 context.isSink = true;
1146 break;
1147 }
1148 }
1149 }
1150
1151 context.registry = &mServiceRegistry;
1154 if (context.error != nullptr) {
1155 context.errorHandling = [&errorCallback = context.error,
1156 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1160 auto& err = error_from_ref(e);
1161 auto& context = ref.get<DataProcessorContext>();
1162 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1163 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1164 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1165 auto& stats = ref.get<DataProcessingStats>();
1167 ErrorContext errorContext{record, ref, e};
1168 errorCallback(errorContext);
1169 };
1170 } else {
1171 context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
1172 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1173 auto& err = error_from_ref(e);
1177 auto& context = ref.get<DataProcessorContext>();
1178 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1179 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1180 auto& stats = ref.get<DataProcessingStats>();
1182 switch (errorPolicy) {
1184 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1185 throw e;
1186 default:
1187 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1188 break;
1189 }
1190 };
1191 }
1192
1193 auto decideEarlyForward = [&context, &spec, this]() -> bool {
1196 bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1197 bool onlyConditions = true;
1198 bool overriddenEarlyForward = false;
1199 for (auto& forwarded : spec.forwards) {
1200 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1201 onlyConditions = false;
1202 }
1203 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1204 context.canForwardEarly = false;
1205 overriddenEarlyForward = true;
1206 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1207 break;
1208 }
1209 if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1210 context.canForwardEarly = false;
1211 overriddenEarlyForward = true;
1212 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1213 break;
1214 }
1215 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1216 context.canForwardEarly = false;
1217 overriddenEarlyForward = true;
1218 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1219 break;
1220 }
1221 }
1222 if (!overriddenEarlyForward && onlyConditions) {
1223 context.canForwardEarly = true;
1224 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1225 }
1226 return canForwardEarly;
1227 };
1228 context.canForwardEarly = decideEarlyForward();
1229}
1230
1232{
1233 auto ref = ServiceRegistryRef{mServiceRegistry};
1234 auto& state = ref.get<DeviceState>();
1235
1236 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1237 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1238 state.quitRequested = false;
1239 state.streaming = StreamingState::Streaming;
1240 state.allowedProcessing = DeviceState::Any;
1241 for (auto& info : state.inputChannelInfos) {
1242 if (info.state != InputChannelState::Pull) {
1243 info.state = InputChannelState::Running;
1244 }
1245 }
1246
1247 // Catch callbacks which fail before we start.
1248 // Notice that when running multiple dataprocessors
1249 // we should probably allow expendable ones to fail.
1250 try {
1251 auto& dpContext = ref.get<DataProcessorContext>();
1252 dpContext.preStartCallbacks(ref);
1253 for (size_t i = 0; i < mStreams.size(); ++i) {
1254 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1255 auto& context = streamRef.get<StreamContext>();
1256 context.preStartStreamCallbacks(streamRef);
1257 }
1258 } catch (std::exception& e) {
1259 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1260 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1261 throw;
1262 } catch (o2::framework::RuntimeErrorRef& e) {
1263 auto& err = error_from_ref(e);
1264 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1265 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1266 throw;
1267 } catch (...) {
1268 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1269 throw;
1270 }
1271
1272 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1273 startPollers();
1274
1275 // Raise to 1 when we are ready to start processing
1276 using o2::monitoring::Metric;
1277 using o2::monitoring::Monitoring;
1278 using o2::monitoring::tags::Key;
1279 using o2::monitoring::tags::Value;
1280
1281 auto& monitoring = ref.get<Monitoring>();
1282 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1283 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1284}
1285
1287{
1288 ServiceRegistryRef ref{mServiceRegistry};
1289 // Raise to 1 when we are ready to start processing
1290 using o2::monitoring::Metric;
1291 using o2::monitoring::Monitoring;
1292 using o2::monitoring::tags::Key;
1293 using o2::monitoring::tags::Value;
1294
1295 auto& monitoring = ref.get<Monitoring>();
1296 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1297
1298 stopPollers();
1299 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1300 auto& dpContext = ref.get<DataProcessorContext>();
1301 dpContext.postStopCallbacks(ref);
1302}
1303
1305{
1306 ServiceRegistryRef ref{mServiceRegistry};
1307 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1308}
1309
1311{
1312 ServiceRegistryRef ref{mServiceRegistry};
1313 auto& state = ref.get<DeviceState>();
1315 bool firstLoop = true;
1316 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1317 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1318
1319 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1320 if (dplEnableMultithreding) {
1321 setenv("UV_THREADPOOL_SIZE", "1", 1);
1322 }
1323
1324 while (state.transitionHandling != TransitionHandlingState::Expired) {
1325 if (state.nextFairMQState.empty() == false) {
1326 (void)this->ChangeState(state.nextFairMQState.back());
1327 state.nextFairMQState.pop_back();
1328 }
1329 // Notify on the main thread the new region callbacks, making sure
1330 // no callback is issued if there is something still processing.
1331 {
1332 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1333 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1334 }
1335 // This will block for the correct delay (or until we get data
1336 // on a socket). We also do not block on the first iteration
1337 // so that devices which do not have a timer can still start an
1338 // enumeration.
1339 {
1340 ServiceRegistryRef ref{mServiceRegistry};
1341 ref.get<DriverClient>().flushPending(mServiceRegistry);
1342 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1343 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1344 // In such case we will take care of it at next iteration.
1345 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1346
1347 auto shouldNotWait = (lastActive != nullptr &&
1348 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1350 if (firstLoop) {
1351 shouldNotWait = true;
1352 firstLoop = false;
1353 }
1354 if (lastActive != nullptr) {
1356 }
1357 if (NewStatePending()) {
1358 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1359 shouldNotWait = true;
1361 }
1362 if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1363 state.transitionHandling = TransitionHandlingState::Requested;
1364 auto& deviceContext = ref.get<DeviceContext>();
1365 // Check if we only have timers
1366 auto& spec = ref.get<DeviceSpec const>();
1367 if (hasOnlyTimers(spec)) {
1369 }
1370
1371 // If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively
1372 // the same (because source devices are not allowed to produce any calibration).
1373 // should be the same.
1374 if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) {
1375 deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout;
1376 }
1377
1378 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1379 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1380 uv_update_time(state.loop);
1381 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1382 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1383 }
1384 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1385 state.transitionHandling = TransitionHandlingState::Requested;
1386 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1387 uv_update_time(state.loop);
1388 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
1389 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1390 if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1391 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
1392 } else {
1393 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before switching to READY state.", (int)deviceContext.exitTransitionTimeout);
1394 }
1395 } else {
1396 state.transitionHandling = TransitionHandlingState::Expired;
1397 if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1398 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1399 } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1400 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1401 } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1402 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1403 } else {
1404 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1405 }
1406 }
1407 }
1408 // If we are Idle, we can then consider the transition to be expired.
1409 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1410 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1411 state.transitionHandling = TransitionHandlingState::Expired;
1412 }
1413 if (state.severityStack.empty() == false) {
1414 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1415 state.severityStack.pop_back();
1416 }
1417 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1418 // shouldNotWait |= info.readPolled;
1419 // }
1420 state.loopReason = DeviceState::NO_REASON;
1421 state.firedTimers.clear();
1422 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1423 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1424 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1425 }
1426 // Run the asynchronous queue just before sleeping again, so that:
1427 // - we can trigger further events from the queue
1428 // - we can guarantee this is the last thing we do in the loop (
1429 // assuming no one else is adding to the queue before this point).
1430 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1431 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1432 ServiceRegistryRef ref{registry};
1433 ref.get<AsyncQueue>();
1434 ref.get<DecongestionService>();
1435 ref.get<DataRelayer>();
1436 // Get the current timeslice for the slot.
1437 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1439 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1440 };
1441 auto& relayer = ref.get<DataRelayer>();
1442 relayer.prunePending(onDrop);
1443 auto& queue = ref.get<AsyncQueue>();
1444 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1445 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1446 if (shouldNotWait == false) {
1447 auto& dpContext = ref.get<DataProcessorContext>();
1448 dpContext.preLoopCallbacks(ref);
1449 }
1450 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1451 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1452 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1453 if ((state.loopReason & state.tracingFlags) != 0) {
1454 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1455 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1456 } else if (state.severityStack.empty() == false) {
1457 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1458 state.severityStack.pop_back();
1459 }
1460 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1461
1462 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1463 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1464 relayer.rescan();
1465 }
1466
1467 if (!state.pendingOffers.empty()) {
1468 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1469 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1470 }
1471 }
1472
1473 // Notify on the main thread the new region callbacks, making sure
1474 // no callback is issued if there is something still processing.
1475 // Notice that we still need to perform callbacks also after
1476 // the socket epolled, because otherwise we would end up serving
1477 // the callback after the first data arrives is the system is too
1478 // fast to transition from Init to Run.
1479 {
1480 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1481 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1482 }
1483
1484 assert(mStreams.size() == mHandles.size());
1486 TaskStreamRef streamRef{-1};
1487 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1488 auto& taskInfo = mStreams[ti];
1489 if (taskInfo.running) {
1490 continue;
1491 }
1492 // Stream 0 is for when we run in
1493 streamRef.index = ti;
1494 }
1495 using o2::monitoring::Metric;
1496 using o2::monitoring::Monitoring;
1497 using o2::monitoring::tags::Key;
1498 using o2::monitoring::tags::Value;
1499 // We have an empty stream, let's check if we have enough
1500 // resources for it to run something
1501 if (streamRef.index != -1) {
1502 // Synchronous execution of the callbacks. This will be moved in the
1503 // moved in the on_socket_polled once we have threading in place.
1504 uv_work_t& handle = mHandles[streamRef.index];
1505 TaskStreamInfo& stream = mStreams[streamRef.index];
1506 handle.data = &mStreams[streamRef.index];
1507
1508 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1509 ServiceRegistryRef ref{registry};
1510 auto& dpStats = ref.get<DataProcessingStats>();
1511 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1512 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1513 dpStats.processCommandQueue();
1514 };
1515 auto ref = ServiceRegistryRef{mServiceRegistry};
1516
1517 // Deciding wether to run or not can be done by passing a request to
1518 // the evaluator. In this case, the request is always satisfied and
1519 // we run on whatever resource is available.
1520 auto& spec = ref.get<DeviceSpec const>();
1521 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1522
1523 if (enough) {
1524 stream.id = streamRef;
1525 stream.running = true;
1526 stream.registry = &mServiceRegistry;
1527 if (dplEnableMultithreding) [[unlikely]] {
1528 stream.task = &handle;
1529 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1530 } else {
1531 run_callback(&handle);
1532 run_completion(&handle, 0);
1533 }
1534 } else {
1535 auto ref = ServiceRegistryRef{mServiceRegistry};
1536 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1537 }
1538 }
1539 }
1540
1541 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1542 auto& spec = ref.get<DeviceSpec const>();
1544 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1545 auto& info = state.inputChannelInfos[ci];
1546 info.parts.fParts.clear();
1547 }
1548 state.transitionHandling = TransitionHandlingState::NoTransition;
1549}
1550
1554{
1555 auto& context = ref.get<DataProcessorContext>();
1556 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1557 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1558
1559 {
1560 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1561 }
1562 // Whether or not we had something to do.
1563
1564 // Initialise the value for context.allDone. It will possibly be updated
1565 // below if any of the channels is not done.
1566 //
1567 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1568 // expect to receive an EndOfStream signal. Thus we do not wait for these
1569 // to be completed. In the case of data source devices, as they do not have
1570 // real data input channels, they have to signal EndOfStream themselves.
1571 auto& state = ref.get<DeviceState>();
1572 auto& spec = ref.get<DeviceSpec const>();
1573 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1574 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1575 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1576 if (info.channel) {
1577 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1578 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1579 } else {
1580 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1581 }
1582 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1583 });
1584 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1585 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1588 static std::vector<int> pollOrder;
1589 pollOrder.resize(state.inputChannelInfos.size());
1590 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1591 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1592 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1593 });
1594
1595 // Nothing to poll...
1596 if (pollOrder.empty()) {
1597 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1598 return;
1599 }
1600 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1601 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1602 auto delta = currentNewest.value - currentOldest.value;
1603 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1604 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1605 auto& infos = state.inputChannelInfos;
1606
1607 if (context.balancingInputs) {
1608 static int pipelineLength = DefaultsHelpers::pipelineLength();
1609 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1610 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1611 return infos[a].oldestForChannel.value > limitNew;
1612 });
1613 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1614 const auto& channelInfo = state.inputChannelInfos[*it];
1615 if (channelInfo.pollerIndex != -1) {
1616 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1617 auto& pollerContext = *(PollerContext*)(poller->data);
1618 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1619 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1620 bool shouldBeRunning = it < newEnd;
1621 if (running != shouldBeRunning) {
1622 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1623 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1624 }
1625 }
1626 }
1627 }
1628 pollOrder.erase(newEnd, pollOrder.end());
1629 }
1630 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1631
1632 for (auto sci : pollOrder) {
1633 auto& info = state.inputChannelInfos[sci];
1634 auto& channelSpec = spec.inputChannels[sci];
1635 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1636 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1637
1638 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1639 context.allDone = false;
1640 }
1641 if (info.state != InputChannelState::Running) {
1642 // Remember to flush data if we are not running
1643 // and there is some message pending.
1644 if (info.parts.Size()) {
1646 }
1647 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1648 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1649 continue;
1650 }
1651 if (info.channel == nullptr) {
1652 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1653 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1654 continue;
1655 }
1656 // Only poll DPL channels for now.
1658 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1659 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1660 continue;
1661 }
1662 auto& socket = info.channel->GetSocket();
1663 // If we have pending events from a previous iteration,
1664 // we do receive in any case.
1665 // Otherwise we check if there is any pending event and skip
1666 // this channel in case there is none.
1667 if (info.hasPendingEvents == 0) {
1668 socket.Events(&info.hasPendingEvents);
1669 // If we do not read, we can continue.
1670 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1671 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1672 continue;
1673 }
1674 }
1675 // We can reset this, because it means we have seen at least 1
1676 // message after the UV_READABLE was raised.
1677 info.readPolled = false;
1678 // Notice that there seems to be a difference between the documentation
1679 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1680 // is raised does not mean that a message is immediately available to
1681 // read, just that it will be available soon, so the receive can
1682 // still return -2. To avoid this we keep receiving on the socket until
1683 // we get a message. In order not to overflow the DPL queue we process
1684 // one message at the time and we keep track of wether there were more
1685 // to process.
1686 bool newMessages = false;
1687 while (true) {
1688 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1689 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1690 if (info.parts.Size() < 64) {
1691 fair::mq::Parts parts;
1692 info.channel->Receive(parts, 0);
1693 if (parts.Size()) {
1694 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1695 }
1696 for (auto&& part : parts) {
1697 info.parts.fParts.emplace_back(std::move(part));
1698 }
1699 newMessages |= true;
1700 }
1701
1702 if (info.parts.Size() >= 0) {
1704 // Receiving data counts as activity now, so that
1705 // We can make sure we process all the pending
1706 // messages without hanging on the uv_run.
1707 break;
1708 }
1709 }
1710 // We check once again for pending events, keeping track if this was the
1711 // case so that we can immediately repeat this loop and avoid remaining
1712 // stuck in uv_run. This is because we will not get notified on the socket
1713 // if more events are pending due to zeromq level triggered approach.
1714 socket.Events(&info.hasPendingEvents);
1715 if (info.hasPendingEvents) {
1716 info.readPolled = false;
1717 // In case there were messages, we consider it as activity
1718 if (newMessages) {
1719 state.lastActiveDataProcessor.store(&context);
1720 }
1721 }
1722 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1723 channelSpec.name.c_str(), info.id.value);
1724 }
1725}
1726
1728{
1729 auto& context = ref.get<DataProcessorContext>();
1730 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1731 auto switchState = [ref](StreamingState newState) {
1732 auto& state = ref.get<DeviceState>();
1733 auto& context = ref.get<DataProcessorContext>();
1734 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1735 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
1736 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
1737 state.streaming = newState;
1738 ref.get<ControlService>().notifyStreamingState(state.streaming);
1739 };
1740 auto& state = ref.get<DeviceState>();
1741 auto& spec = ref.get<DeviceSpec const>();
1742
1743 if (state.streaming == StreamingState::Idle) {
1744 return;
1745 }
1746
1747 context.completed.clear();
1748 context.completed.reserve(16);
1749 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1750 state.lastActiveDataProcessor.store(&context);
1751 }
1752 DanglingContext danglingContext{*context.registry};
1753
1754 context.preDanglingCallbacks(danglingContext);
1755 if (state.lastActiveDataProcessor.load() == nullptr) {
1756 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1757 }
1758 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1759 if (activity.expiredSlots > 0) {
1760 state.lastActiveDataProcessor = &context;
1761 }
1762
1763 context.completed.clear();
1764 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1765 state.lastActiveDataProcessor = &context;
1766 }
1767
1768 context.postDanglingCallbacks(danglingContext);
1769
1770 // If we got notified that all the sources are done, we call the EndOfStream
1771 // callback and return false. Notice that what happens next is actually
1772 // dependent on the callback, not something which is controlled by the
1773 // framework itself.
1774 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1775 switchState(StreamingState::EndOfStreaming);
1776 state.lastActiveDataProcessor = &context;
1777 }
1778
1779 if (state.streaming == StreamingState::EndOfStreaming) {
1780 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1781 // We keep processing data until we are Idle.
1782 // FIXME: not sure this is the correct way to drain the queues, but
1783 // I guess we will see.
1786 auto& relayer = ref.get<DataRelayer>();
1787
1788 bool shouldProcess = hasOnlyGenerated(spec) == false;
1789
1790 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1791 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1792 }
1793
1794 auto& timingInfo = ref.get<TimingInfo>();
1795 // We should keep the data generated at end of stream only for those
1796 // which are not sources.
1797 timingInfo.keepAtEndOfStream = shouldProcess;
1798 // Fill timinginfo with some reasonable values for data sent with endOfStream
1799 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1800 timingInfo.tfCounter = -1;
1801 timingInfo.firstTForbit = -1;
1802 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1803 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1804 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1805
1806 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1807
1808 context.preEOSCallbacks(eosContext);
1809 auto& streamContext = ref.get<StreamContext>();
1810 streamContext.preEOSCallbacks(eosContext);
1811 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1812 streamContext.postEOSCallbacks(eosContext);
1813 context.postEOSCallbacks(eosContext);
1814
1815 for (auto& channel : spec.outputChannels) {
1816 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1818 }
1819 // This is needed because the transport is deleted before the device.
1820 relayer.clear();
1821 switchState(StreamingState::Idle);
1822 // In case we should process, note the data processor responsible for it
1823 if (shouldProcess) {
1824 state.lastActiveDataProcessor = &context;
1825 }
1826 // On end of stream we shut down all output pollers.
1827 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1828 for (auto& poller : state.activeOutputPollers) {
1829 uv_poll_stop(poller);
1830 }
1831 return;
1832 }
1833
1834 if (state.streaming == StreamingState::Idle) {
1835 // On end of stream we shut down all output pollers.
1836 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1837 for (auto& poller : state.activeOutputPollers) {
1838 uv_poll_stop(poller);
1839 }
1840 }
1841
1842 return;
1843}
1844
1846{
1847 ServiceRegistryRef ref{mServiceRegistry};
1848 ref.get<DataRelayer>().clear();
1849 auto& deviceContext = ref.get<DeviceContext>();
1850 // If the signal handler is there, we should
1851 // hide the registry from it, so that we do not
1852 // end up calling the signal handler on something
1853 // which might not be there anymore.
1854 if (deviceContext.sigusr1Handle) {
1855 deviceContext.sigusr1Handle->data = nullptr;
1856 }
1857 // Makes sure we do not have a working context on
1858 // shutdown.
1859 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1860 handle->data = nullptr;
1861 }
1862}
1863
1866 {
1867 }
1868};
1869
1875{
1876 using InputInfo = DataRelayer::InputInfo;
1878
1879 auto& context = ref.get<DataProcessorContext>();
1880 // This is the same id as the upper level function, so we get the events
1881 // associated with the same interval. We will simply use "handle_data" as
1882 // the category.
1883 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1884
1885 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1886 // and we do a few stats. We bind parts as a lambda captured variable, rather
1887 // than an input, because we do not want the outer loop actually be exposed
1888 // to the implementation details of the messaging layer.
1889 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1890 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1891 auto ref = ServiceRegistryRef{*context.registry};
1892 auto& stats = ref.get<DataProcessingStats>();
1893 auto& state = ref.get<DeviceState>();
1894 auto& parts = info.parts;
1895 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1896
1897 std::vector<InputInfo> results;
1898 // we can reserve the upper limit
1899 results.reserve(parts.Size() / 2);
1900 size_t nTotalPayloads = 0;
1901
1902 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1903 results.emplace_back(position, length, type, index);
1904 if (type != InputType::Invalid && length > 1) {
1905 nTotalPayloads += length - 1;
1906 }
1907 };
1908
1909 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1910 auto* headerData = parts.At(pi)->GetData();
1911 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1912 if (sih) {
1913 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1914 info.state = sih->state;
1915 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1916 state.lastActiveDataProcessor = &context;
1917 continue;
1918 }
1919 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1920 if (dih) {
1921 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1922 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1923 state.lastActiveDataProcessor = &context;
1924 continue;
1925 }
1926 auto dh = o2::header::get<DataHeader*>(headerData);
1927 if (!dh) {
1928 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1929 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1930 continue;
1931 }
1932 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1933 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1934 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1935 continue;
1936 }
1937 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1938 // We only deal with the tracking of parts if the log is enabled.
1939 // This is because in principle we should track the size of each of
1940 // the parts and sum it up. Not for now.
1941 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1942 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1943 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1944 if (!dph) {
1945 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1946 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1947 continue;
1948 }
1949 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1950 // this is indicating a sequence of payloads following the header
1951 // FIXME: we will probably also set the DataHeader version
1952 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1953 pi += dh->splitPayloadParts - 1;
1954 } else {
1955 // We can set the type for the next splitPayloadParts
1956 // because we are guaranteed they are all the same.
1957 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1958 // pair.
1959 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1960 if (finalSplitPayloadIndex > parts.Size()) {
1961 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1962 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1963 continue;
1964 }
1965 insertInputInfo(pi, 2, InputType::Data, info.id);
1966 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1967 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1968 }
1969 }
1970 }
1971 if (results.size() + nTotalPayloads != parts.Size()) {
1972 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1973 return std::nullopt;
1974 }
1975 return results;
1976 };
1977
1978 auto reportError = [ref](const char* message) {
1979 auto& stats = ref.get<DataProcessingStats>();
1981 };
1982
1983 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1984 auto& relayer = ref.get<DataRelayer>();
1985 auto& state = ref.get<DeviceState>();
1986 static WaitBackpressurePolicy policy;
1987 auto& parts = info.parts;
1988 // We relay execution to make sure we have a complete set of parts
1989 // available.
1990 bool hasBackpressure = false;
1991 size_t minBackpressureTimeslice = -1;
1992 bool hasData = false;
1993 size_t oldestPossibleTimeslice = -1;
1994 static std::vector<int> ordering;
1995 // Same as inputInfos but with iota.
1996 ordering.resize(inputInfos.size());
1997 std::iota(ordering.begin(), ordering.end(), 0);
1998 // stable sort orderings by type and position
1999 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
2000 auto const& ai = inputInfos[a];
2001 auto const& bi = inputInfos[b];
2002 if (ai.type != bi.type) {
2003 return ai.type < bi.type;
2004 }
2005 return ai.position < bi.position;
2006 });
2007 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
2008 auto const& input = inputInfos[ordering[ii]];
2009 switch (input.type) {
2010 case InputType::Data: {
2011 hasData = true;
2012 auto headerIndex = input.position;
2013 auto nMessages = 0;
2014 auto nPayloadsPerHeader = 0;
2015 if (input.size > 2) {
2016 // header and multiple payload sequence
2017 nMessages = input.size;
2018 nPayloadsPerHeader = nMessages - 1;
2019 } else {
2020 // multiple header-payload pairs
2021 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2022 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2023 nPayloadsPerHeader = 1;
2024 ii += (nMessages / 2) - 1;
2025 }
2026 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
2027 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
2028 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2029 slot.index, oldestOutputInfo.timeslice.value);
2030 ref.get<AsyncQueue>();
2031 ref.get<DecongestionService>();
2032 ref.get<DataRelayer>();
2033 // Get the current timeslice for the slot.
2034 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
2036 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
2037 };
2038 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2039 &parts.At(headerIndex),
2040 input,
2041 nMessages,
2042 nPayloadsPerHeader,
2043 onDrop);
2044 switch (relayed.type) {
2046 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
2047 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
2048 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2049 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2050 info.backpressureNotified = true;
2051 info.normalOpsNotified = false;
2052 }
2053 policy.backpressure(info);
2054 hasBackpressure = true;
2055 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2056 break;
2060 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2061 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2062 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2063 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2064 info.normalOpsNotified = true;
2065 info.backpressureNotified = false;
2066 }
2067 break;
2068 }
2069 } break;
2070 case InputType::SourceInfo: {
2071 LOGP(detail, "Received SourceInfo");
2072 auto& context = ref.get<DataProcessorContext>();
2073 state.lastActiveDataProcessor = &context;
2074 auto headerIndex = input.position;
2075 auto payloadIndex = input.position + 1;
2076 assert(payloadIndex < parts.Size());
2077 // FIXME: the message with the end of stream cannot contain
2078 // split parts.
2079 parts.At(headerIndex).reset(nullptr);
2080 parts.At(payloadIndex).reset(nullptr);
2081 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2082 // parts.At(headerIndex + 1 + i).reset(nullptr);
2083 // }
2084 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2085
2086 } break;
2087 case InputType::DomainInfo: {
2090 auto& context = ref.get<DataProcessorContext>();
2091 state.lastActiveDataProcessor = &context;
2092 auto headerIndex = input.position;
2093 auto payloadIndex = input.position + 1;
2094 assert(payloadIndex < parts.Size());
2095 // FIXME: the message with the end of stream cannot contain
2096 // split parts.
2097
2098 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2099 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2100 break;
2101 }
2102 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2103 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2104 parts.At(headerIndex).reset(nullptr);
2105 parts.At(payloadIndex).reset(nullptr);
2106 }
2107 case InputType::Invalid: {
2108 reportError("Invalid part found.");
2109 } break;
2110 }
2111 }
2114 if (oldestPossibleTimeslice != (size_t)-1) {
2115 info.oldestForChannel = {oldestPossibleTimeslice};
2116 auto& context = ref.get<DataProcessorContext>();
2117 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2118 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2119 state.lastActiveDataProcessor = &context;
2120 }
2121 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2122 parts.fParts.erase(it, parts.end());
2123 if (parts.fParts.size()) {
2124 LOG(debug) << parts.fParts.size() << " messages backpressured";
2125 }
2126 };
2127
2128 // Second part. This is the actual outer loop we want to obtain, with
2129 // implementation details which can be read. Notice how most of the state
2130 // is actually hidden. For example we do not expose what "input" is. This
2131 // will allow us to keep the same toplevel logic even if the actual meaning
2132 // of input is changed (for example we might move away from multipart
2133 // messages). Notice also that we need to act diffently depending on the
2134 // actual CompletionOp we want to perform. In particular forwarding inputs
2135 // also gets rid of them from the cache.
2136 auto inputTypes = getInputTypes();
2137 if (bool(inputTypes) == false) {
2138 reportError("Parts should come in couples. Dropping it.");
2139 return;
2140 }
2141 handleValidMessages(*inputTypes);
2142 return;
2143}
2144
2145namespace
2146{
2147struct InputLatency {
2148 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2149 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2150};
2151
2152auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2153{
2154 InputLatency result;
2155
2156 for (auto& item : record) {
2157 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2158 if (header == nullptr) {
2159 continue;
2160 }
2161 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2162 if (partLatency < 0) {
2163 partLatency = 0;
2164 }
2165 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2166 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2167 }
2168 return result;
2169};
2170
2171auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2172{
2173 size_t totalInputSize = 0;
2174 for (auto& item : record) {
2175 auto* header = o2::header::get<DataHeader*>(item.header);
2176 if (header == nullptr) {
2177 continue;
2178 }
2179 totalInputSize += header->payloadSize;
2180 }
2181 return totalInputSize;
2182};
2183
2184template <typename T>
2185void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2186{
2187 T prev_value = maximum_value;
2188 while (prev_value < value &&
2189 !maximum_value.compare_exchange_weak(prev_value, value)) {
2190 }
2191}
2192} // namespace
2193
2194bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2195{
2196 auto& context = ref.get<DataProcessorContext>();
2197 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2198 // This is the actual hidden state for the outer loop. In case we decide we
2199 // want to support multithreaded dispatching of operations, I can simply
2200 // move these to some thread local store and the rest of the lambdas
2201 // should work just fine.
2202 std::vector<MessageSet> currentSetOfInputs;
2203
2204 //
2205 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2206 auto& relayer = ref.get<DataRelayer>();
2207 if (consume) {
2208 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2209 } else {
2210 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2211 }
2212 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2213 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2214 const char* headerptr = nullptr;
2215 const char* payloadptr = nullptr;
2216 size_t payloadSize = 0;
2217 // - each input can have multiple parts
2218 // - "part" denotes a sequence of messages belonging together, the first message of the
2219 // sequence is the header message
2220 // - each part has one or more payload messages
2221 // - InputRecord provides all payloads as header-payload pair
2222 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2223 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2224 headerptr = static_cast<char const*>(headerMsg->GetData());
2225 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2226 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2227 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2228 }
2229 return DataRef{};
2230 };
2231 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2232 return currentSetOfInputs[i].getNumberOfPairs();
2233 };
2234 return InputSpan{getter, nofPartsGetter, currentSetOfInputs.size()};
2235 };
2236
2237 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2238 auto& relayer = ref.get<DataRelayer>();
2240 };
2241
2242 // I need a preparation step which gets the current timeslice id and
2243 // propagates it to the various contextes (i.e. the actual entities which
2244 // create messages) because the messages need to have the timeslice id into
2245 // it.
2246 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2247 auto& relayer = ref.get<DataRelayer>();
2248 auto& timingInfo = ref.get<TimingInfo>();
2249 auto timeslice = relayer.getTimesliceForSlot(i);
2250
2251 timingInfo.timeslice = timeslice.value;
2252 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2253 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2254 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2255 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2256 };
2257 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2258 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2259 auto& relayer = ref.get<DataRelayer>();
2260 auto& timingInfo = ref.get<TimingInfo>();
2261 auto timeslice = relayer.getTimesliceForSlot(i);
2262 // We report wether or not this timing info refers to a new Run.
2263 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2264 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2265 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2266 // FIXME: for now there is only one stream, however we
2267 // should calculate this correctly once we finally get the
2268 // the StreamContext in.
2269 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2270 };
2271
2272 // When processing them, timers will have to be cleaned up
2273 // to avoid double counting them.
2274 // This was actually the easiest solution we could find for
2275 // O2-646.
2276 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2277 assert(record.size() == currentSetOfInputs.size());
2278 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2279 // assuming that for timer inputs we do have exactly one PartRef object
2280 // in the MessageSet, multiple PartRef Objects are only possible for either
2281 // split payload messages of wildcard matchers, both for data inputs
2282 DataRef input = record.getByPos(ii);
2283 if (input.spec->lifetime != Lifetime::Timer) {
2284 continue;
2285 }
2286 if (input.header == nullptr) {
2287 continue;
2288 }
2289 // This will hopefully delete the message.
2290 currentSetOfInputs[ii].clear();
2291 }
2292 };
2293
2294 // Function to cleanup record. For the moment we
2295 // simply use it to keep track of input messages
2296 // which are not needed, to display them in the GUI.
2297 auto cleanupRecord = [](InputRecord& record) {
2298 if (O2_LOG_ENABLED(parts) == false) {
2299 return;
2300 }
2301 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2302 DataRef input = record.getByPos(pi);
2303 if (input.header == nullptr) {
2304 continue;
2305 }
2306 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2307 if (sih) {
2308 continue;
2309 }
2310
2311 auto dh = o2::header::get<DataHeader*>(input.header);
2312 if (!dh) {
2313 continue;
2314 }
2315 // We use the address of the first header of a split payload
2316 // to identify the interval.
2317 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2318 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2319
2320 // No split parts, we simply skip the payload
2321 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2322 // this is indicating a sequence of payloads following the header
2323 // FIXME: we will probably also set the DataHeader version
2324 pi += dh->splitPayloadParts - 1;
2325 } else {
2326 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2327 }
2328 }
2329 };
2330
2331 auto switchState = [ref](StreamingState newState) {
2332 auto& control = ref.get<ControlService>();
2333 auto& state = ref.get<DeviceState>();
2334 state.streaming = newState;
2335 control.notifyStreamingState(state.streaming);
2336 };
2337
2338 ref.get<DataRelayer>().getReadyToProcess(completed);
2339 if (completed.empty() == true) {
2340 LOGP(debug, "No computations available for dispatching.");
2341 return false;
2342 }
2343
2344 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2345 auto& stats = ref.get<DataProcessingStats>();
2346 auto& states = ref.get<DataProcessingStates>();
2347 std::atomic_thread_fence(std::memory_order_release);
2348 char relayerSlotState[1024];
2349 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2350 char* buffer = relayerSlotState + written;
2351 for (size_t ai = 0; ai != record.size(); ai++) {
2352 buffer[ai] = record.isValid(ai) ? '3' : '0';
2353 }
2354 buffer[record.size()] = 0;
2355 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2356 .size = (int)(record.size() + buffer - relayerSlotState),
2357 .data = relayerSlotState});
2358 uint64_t tEnd = uv_hrtime();
2359 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2360 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2362 // Sum up the total wall time, in milliseconds.
2364 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2366 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2367 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2368 auto latency = calculateInputRecordLatency(record, tStartMilli);
2369 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2370 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2371 static int count = 0;
2373 count++;
2374 };
2375
2376 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2377 auto& states = ref.get<DataProcessingStates>();
2378 std::atomic_thread_fence(std::memory_order_release);
2379 char relayerSlotState[1024];
2380 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2381 char* buffer = strchr(relayerSlotState, ' ') + 1;
2382 for (size_t ai = 0; ai != record.size(); ai++) {
2383 buffer[ai] = record.isValid(ai) ? '2' : '0';
2384 }
2385 buffer[record.size()] = 0;
2386 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2387 };
2388
2389 // This is the main dispatching loop
2390 auto& state = ref.get<DeviceState>();
2391 auto& spec = ref.get<DeviceSpec const>();
2392
2393 auto& dpContext = ref.get<DataProcessorContext>();
2394 auto& streamContext = ref.get<StreamContext>();
2395 O2_SIGNPOST_ID_GENERATE(sid, device);
2396 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2397
2398 auto& stats = ref.get<DataProcessingStats>();
2399 auto& relayer = ref.get<DataRelayer>();
2400 using namespace o2::framework;
2401 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2402 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2403 switch (spec.completionPolicy.order) {
2405 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2406 break;
2408 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2409 break;
2411 default:
2412 break;
2413 }
2414
2415 for (auto action : completed) {
2416 O2_SIGNPOST_ID_GENERATE(aid, device);
2417 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2419 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2420 continue;
2421 }
2422
2423 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2425 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2429 updateRunInformation(TimesliceSlot{action.slot});
2430 }
2431 InputSpan span = getInputSpan(action.slot, shouldConsume);
2432 auto& spec = ref.get<DeviceSpec const>();
2433 InputRecord record{spec.inputs,
2434 span,
2435 *context.registry};
2436 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2437 {
2438 // Notice this should be thread safe and reentrant
2439 // as it is called from many threads.
2440 streamContext.preProcessingCallbacks(processContext);
2441 dpContext.preProcessingCallbacks(processContext);
2442 }
2444 context.postDispatchingCallbacks(processContext);
2445 if (spec.forwards.empty() == false) {
2446 auto& timesliceIndex = ref.get<TimesliceIndex>();
2447 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2448 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2449 continue;
2450 }
2451 }
2452 // If there is no optional inputs we canForwardEarly
2453 // the messages to that parallel processing can happen.
2454 // In this case we pass true to indicate that we want to
2455 // copy the messages to the subsequent data processor.
2456 bool hasForwards = spec.forwards.empty() == false;
2458
2459 if (context.canForwardEarly && hasForwards && consumeSomething) {
2460 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2461 auto& timesliceIndex = ref.get<TimesliceIndex>();
2462 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2463 }
2464 markInputsAsDone(action.slot);
2465
2466 uint64_t tStart = uv_hrtime();
2467 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2468 preUpdateStats(action, record, tStart);
2469
2470 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2471
2472 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2473 auto& state = ref.get<DeviceState>();
2474 auto& spec = ref.get<DeviceSpec const>();
2475 auto& streamContext = ref.get<StreamContext>();
2476 auto& dpContext = ref.get<DataProcessorContext>();
2477 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2478 switch (action.op) {
2483 return true;
2484 break;
2485 default:
2486 return false;
2487 }
2488 };
2489 if (state.quitRequested == false) {
2490 {
2491 // Callbacks from services
2492 dpContext.preProcessingCallbacks(processContext);
2493 streamContext.preProcessingCallbacks(processContext);
2494 dpContext.preProcessingCallbacks(processContext);
2495 // Callbacks from users
2496 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2497 }
2498 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2499 if (context.statefulProcess && shouldProcess(action)) {
2500 // This way, usercode can use the the same processing context to identify
2501 // its signposts and we can map user code to device iterations.
2502 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2503 (context.statefulProcess)(processContext);
2504 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2505 } else if (context.statelessProcess && shouldProcess(action)) {
2506 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2507 (context.statelessProcess)(processContext);
2508 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2509 } else if (context.statelessProcess || context.statefulProcess) {
2510 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2511 } else {
2512 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2513 state.streaming = StreamingState::Idle;
2514 }
2515 if (shouldProcess(action)) {
2516 auto& timingInfo = ref.get<TimingInfo>();
2517 if (timingInfo.globalRunNumberChanged) {
2518 context.lastRunNumberProcessed = timingInfo.runNumber;
2519 }
2520 }
2521
2522 // Notify the sink we just consumed some timeframe data
2523 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2524 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2525 auto& allocator = ref.get<DataAllocator>();
2526 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2527 }
2528
2529 // Extra callback which allows a service to add extra outputs.
2530 // This is needed e.g. to ensure that injected CCDB outputs are added
2531 // before an end of stream.
2532 {
2533 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2534 dpContext.finaliseOutputsCallbacks(processContext);
2535 streamContext.finaliseOutputsCallbacks(processContext);
2536 }
2537
2538 {
2539 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2540 dpContext.postProcessingCallbacks(processContext);
2541 streamContext.postProcessingCallbacks(processContext);
2542 }
2543 }
2544 };
2545
2546 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2547 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2548 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2549 }
2550 if (noCatch) {
2551 try {
2552 runNoCatch(action);
2553 } catch (o2::framework::RuntimeErrorRef e) {
2554 (context.errorHandling)(e, record);
2555 }
2556 } else {
2557 try {
2558 runNoCatch(action);
2559 } catch (std::exception& ex) {
2563 auto e = runtime_error(ex.what());
2564 (context.errorHandling)(e, record);
2565 } catch (o2::framework::RuntimeErrorRef e) {
2566 (context.errorHandling)(e, record);
2567 }
2568 }
2569 if (state.severityStack.empty() == false) {
2570 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2571 state.severityStack.pop_back();
2572 }
2573
2574 postUpdateStats(action, record, tStart, tStartMilli);
2575 // We forward inputs only when we consume them. If we simply Process them,
2576 // we keep them for next message arriving.
2578 cleanupRecord(record);
2579 context.postDispatchingCallbacks(processContext);
2580 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2581 }
2582 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2583 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2584 auto& timesliceIndex = ref.get<TimesliceIndex>();
2585 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2586 }
2587 context.postForwardingCallbacks(processContext);
2589 cleanTimers(action.slot, record);
2590 }
2591 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2592 }
2593 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2594
2595 // We now broadcast the end of stream if it was requested
2596 if (state.streaming == StreamingState::EndOfStreaming) {
2597 LOGP(detail, "Broadcasting end of stream");
2598 for (auto& channel : spec.outputChannels) {
2600 }
2601 switchState(StreamingState::Idle);
2602 }
2603
2604 return true;
2605}
2606
2608{
2609 LOG(error) << msg;
2610 ServiceRegistryRef ref{mServiceRegistry};
2611 auto& stats = ref.get<DataProcessingStats>();
2613}
2614
2615std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2616{
2617
2618 if (registry.active<ConfigurationInterface>()) {
2619 auto& cfg = registry.get<ConfigurationInterface>();
2620 try {
2621 cfg.getRecursive(name);
2622 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2623 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2624 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2625 configStore->preload();
2626 configStore->activate();
2627 return configStore;
2628 } catch (...) {
2629 // No overrides...
2630 }
2631 }
2632 return {nullptr};
2633}
2634
2635} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:515
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:515
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:489
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:504
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &, ProcessingPolicies &policies)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:70
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:618
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153