Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
31#include "Framework/InputSpan.h"
32#if defined(__APPLE__) || defined(NDEBUG)
33#define O2_SIGNPOST_IMPLEMENTATION
34#endif
35#include "Framework/Signpost.h"
48
49#include "DecongestionService.h"
51#include "DataRelayerHelpers.h"
52#include "Headers/DataHeader.h"
54
55#include <Framework/Tracing.h>
56
57#include <fairmq/Parts.h>
58#include <fairmq/Socket.h>
59#include <fairmq/ProgOptions.h>
60#if __has_include(<fairmq/shmem/Message.h>)
61#include <fairmq/shmem/Message.h>
62#endif
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// A log to use for general device logging
91// Special log to keep track of the lifetime of the parts
93// Stream which keeps track of the calibration lifetime logic
95// Special log to track the async queue behavior
97// Special log to track the forwarding requests
99// Special log to track CCDB related requests
101
102using namespace o2::framework;
103using ConfigurationInterface = o2::configuration::ConfigurationInterface;
105
106constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
107
108namespace o2::framework
109{
110
111template <>
115
119{
120 auto* state = (DeviceState*)handle->data;
121 state->loopReason |= DeviceState::TIMER_EXPIRED;
122}
123
124bool hasOnlyTimers(DeviceSpec const& spec)
125{
126 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
127}
128
130{
131 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
132}
133
135{
136 auto* ref = (ServiceRegistryRef*)handle->data;
137 auto& state = ref->get<DeviceState>();
138 state.loopReason |= DeviceState::TIMER_EXPIRED;
139 // Check if this is a source device
140 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
141 auto& spec = ref->get<DeviceSpec const>();
142 std::string messageOnExpire = hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
143 if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
144 O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
145 } else {
146 O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
147 }
148 state.transitionHandling = TransitionHandlingState::Expired;
149}
150
152{
153 auto& state = ref.get<DeviceState>();
154 auto& context = ref.get<DataProcessorContext>();
155 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
156 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
157 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
158 state.streaming = newState;
159 ref.get<ControlService>().notifyStreamingState(state.streaming);
160};
161
163{
164 auto* ref = (ServiceRegistryRef*)handle->data;
165 auto& state = ref->get<DeviceState>();
166 auto& spec = ref->get<DeviceSpec const>();
167 state.loopReason |= DeviceState::TIMER_EXPIRED;
168
169 // Check if this is a source device
170 O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
171
172 if (hasOnlyGenerated(spec)) {
173 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
175 } else {
176 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
177 state.allowedProcessing = DeviceState::CalibrationOnly;
178 }
179}
180
182{
183 auto* state = (DeviceState*)s->data;
185}
186
187DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
188{
189 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
190 return devices[running.index];
191}
192
198
200 : mRunningDevice{running},
201 mConfigRegistry{nullptr},
202 mServiceRegistry{registry},
203 mProcessingPolicies{policies}
204{
205 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
206 if (key == "cleanup") {
208 auto& deviceState = ref.get<DeviceState>();
209 int64_t cleanupCount = deviceState.cleanupCount.load();
210 int64_t newCleanupCount = std::stoll(value);
211 if (newCleanupCount <= cleanupCount) {
212 return;
213 }
214 deviceState.cleanupCount.store(newCleanupCount);
215 for (auto& info : deviceState.inputChannelInfos) {
216 fair::mq::Parts parts;
217 while (info.channel->Receive(parts, 0)) {
218 LOGP(debug, "Dropping {} parts", parts.Size());
219 if (parts.Size() == 0) {
220 break;
221 }
222 }
223 }
224 }
225 });
226
227 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
229 auto& deviceState = ref.get<DeviceState>();
230 auto& control = ref.get<ControlService>();
231 auto& callbacks = ref.get<CallbackService>();
232 control.notifyDeviceState(fair::mq::GetStateName(state));
234
235 if (deviceState.nextFairMQState.empty() == false) {
236 auto state = deviceState.nextFairMQState.back();
237 (void)this->ChangeState(state);
238 deviceState.nextFairMQState.pop_back();
239 }
240 };
241
242 // 99 is to execute DPL callbacks last
243 this->SubscribeToStateChange("99-dpl", stateWatcher);
244
245 // One task for now.
246 mStreams.resize(1);
247 mHandles.resize(1);
248
249 ServiceRegistryRef ref{mServiceRegistry};
250 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
251 auto& state = ref.get<DeviceState>();
252 assert(state.loop);
253 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
254 mAwakeHandle->data = &state;
255 if (res < 0) {
256 LOG(error) << "Unable to initialise subscription";
257 }
258
260 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
261 int res = uv_async_send(wakeHandle);
262 if (res < 0) {
263 LOG(error) << "Unable to notify subscription";
264 }
265 LOG(debug) << "State transition requested";
266 });
267}
268
269// Callback to execute the processing. Notice how the data is
270// is a vector of DataProcessorContext so that we can index the correct
271// one with the thread id. For the moment we simply use the first one.
272void run_callback(uv_work_t* handle)
273{
274 auto* task = (TaskStreamInfo*)handle->data;
275 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
276 // We create a new signpost interval for this specific data processor. Same id, same data processor.
277 auto& dataProcessorContext = ref.get<DataProcessorContext>();
278 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
279 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
282 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
283}
284
285// Once the processing in a thread is done, this is executed on the main thread.
286void run_completion(uv_work_t* handle, int status)
287{
288 auto* task = (TaskStreamInfo*)handle->data;
289 // Notice that the completion, while running on the main thread, still
290 // has a salt which is associated to the actual stream which was doing the computation
291 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
292 auto& state = ref.get<DeviceState>();
293 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
294
295 using o2::monitoring::Metric;
296 using o2::monitoring::Monitoring;
297 using o2::monitoring::tags::Key;
298 using o2::monitoring::tags::Value;
299
300 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
301 auto& dpStats = ref.get<DataProcessingStats>();
302 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
303
304 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
305 dpStats.processCommandQueue();
306 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
307 };
308
309 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
310 auto& dpStats = ref.get<DataProcessingStats>();
311 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
312 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
313 dpStats.processCommandQueue();
314 };
315
316 for (auto& consumer : state.offerConsumers) {
317 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
318 }
319 state.offerConsumers.clear();
320 quotaEvaluator.handleExpired(reportExpiredOffer);
321 quotaEvaluator.dispose(task->id.index);
322 task->running = false;
323}
324
325// Context for polling
327 enum struct PollerState : char { Stopped,
329 Connected,
330 Suspended };
331 char const* name = nullptr;
332 uv_loop_t* loop = nullptr;
334 DeviceState* state = nullptr;
335 fair::mq::Socket* socket = nullptr;
337 int fd = -1;
338 bool read = true;
340};
341
342void on_socket_polled(uv_poll_t* poller, int status, int events)
343{
344 auto* context = (PollerContext*)poller->data;
345 assert(context);
346 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
347 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
348 switch (events) {
349 case UV_READABLE: {
350 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
351 context->state->loopReason |= DeviceState::DATA_INCOMING;
352 } break;
353 case UV_WRITABLE: {
354 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
355 if (context->read) {
356 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
357 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
358 context->state->loopReason |= DeviceState::DATA_CONNECTED;
359 } else {
360 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
361 context->state->loopReason |= DeviceState::DATA_OUTGOING;
362 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
363 // just wait for the disconnect.
364 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
365 }
366 context->pollerState = PollerContext::PollerState::Connected;
367 } break;
368 case UV_DISCONNECT: {
369 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
370 } break;
371 case UV_PRIORITIZED: {
372 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
373 } break;
374 }
375 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
376}
377
378void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
379{
380 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
381 auto* context = (PollerContext*)poller->data;
382 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
383 if (status < 0) {
384 LOGP(fatal, "Error while polling {}: {}", context->name, status);
385 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
386 }
387 switch (events) {
388 case UV_READABLE: {
389 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
390 context->state->loopReason |= DeviceState::DATA_INCOMING;
391 assert(context->channelInfo);
392 context->channelInfo->readPolled = true;
393 } break;
394 case UV_WRITABLE: {
395 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
396 if (context->read) {
397 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
398 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
399 } else {
400 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
401 context->state->loopReason |= DeviceState::DATA_OUTGOING;
402 }
403 } break;
404 case UV_DISCONNECT: {
405 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
406 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
407 } break;
408 case UV_PRIORITIZED: {
409 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
410 } break;
411 }
412 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
413}
414
423{
424 auto ref = ServiceRegistryRef{mServiceRegistry};
425 auto& context = ref.get<DataProcessorContext>();
426 auto& spec = getRunningDevice(mRunningDevice, ref);
427
428 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
429 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
430 context.statelessProcess = spec.algorithm.onProcess;
431 context.statefulProcess = nullptr;
432 context.error = spec.algorithm.onError;
433 context.initError = spec.algorithm.onInitError;
434
435 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
436 if (configStore == nullptr) {
437 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
438 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
439 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
440 configStore->preload();
441 configStore->activate();
442 }
443
444 using boost::property_tree::ptree;
445
447 for (auto& entry : configStore->store()) {
448 std::stringstream ss;
449 std::string str;
450 if (entry.second.empty() == false) {
451 boost::property_tree::json_parser::write_json(ss, entry.second, false);
452 str = ss.str();
453 str.pop_back(); // remove EoL
454 } else {
455 str = entry.second.get_value<std::string>();
456 }
457 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
458 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
459 }
460
461 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
462
463 // Setup the error handlers for init
464 if (context.initError) {
465 context.initErrorHandling = [&errorCallback = context.initError,
466 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
470 auto& context = ref.get<DataProcessorContext>();
471 auto& err = error_from_ref(e);
472 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
473 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
474 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
475 auto& stats = ref.get<DataProcessingStats>();
477 InitErrorContext errorContext{ref, e};
478 errorCallback(errorContext);
479 };
480 } else {
481 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
482 auto& err = error_from_ref(e);
486 auto& context = ref.get<DataProcessorContext>();
487 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
488 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
489 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
490 auto& stats = ref.get<DataProcessingStats>();
492 exit(1);
493 };
494 }
495
496 context.expirationHandlers.clear();
497 context.init = spec.algorithm.onInit;
498 if (context.init) {
499 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
500 InitContext initContext{*mConfigRegistry, mServiceRegistry};
501
502 if (noCatch) {
503 try {
504 context.statefulProcess = context.init(initContext);
506 if (context.initErrorHandling) {
507 (context.initErrorHandling)(e);
508 }
509 }
510 } else {
511 try {
512 context.statefulProcess = context.init(initContext);
513 } catch (std::exception& ex) {
517 auto e = runtime_error(ex.what());
518 (context.initErrorHandling)(e);
520 (context.initErrorHandling)(e);
521 }
522 }
523 }
524 auto& state = ref.get<DeviceState>();
525 state.inputChannelInfos.resize(spec.inputChannels.size());
529 int validChannelId = 0;
530 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
531 auto& name = spec.inputChannels[ci].name;
532 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
533 state.inputChannelInfos[ci].state = InputChannelState::Pull;
534 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
535 validChannelId++;
536 } else {
537 state.inputChannelInfos[ci].id = {validChannelId++};
538 }
539 }
540
541 // Invoke the callback policy for this device.
542 if (spec.callbacksPolicy.policy != nullptr) {
543 InitContext initContext{*mConfigRegistry, mServiceRegistry};
544 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
545 }
546
547 // Services which are stream should be initialised now
548 auto* options = GetConfig();
549 for (size_t si = 0; si < mStreams.size(); ++si) {
551 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
552 }
553 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
554}
555
556void on_signal_callback(uv_signal_t* handle, int signum)
557{
558 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
559 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
560
561 auto* registry = (ServiceRegistry*)handle->data;
562 if (!registry) {
563 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
564 return;
565 }
566 ServiceRegistryRef ref{*registry};
567 auto& state = ref.get<DeviceState>();
568 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
569 auto& stats = ref.get<DataProcessingStats>();
571 size_t ri = 0;
572 while (ri != quotaEvaluator.mOffers.size()) {
573 auto& offer = quotaEvaluator.mOffers[ri];
574 // We were already offered some sharedMemory, so we
575 // do not consider the offer.
576 // FIXME: in principle this should account for memory
577 // available and being offered, however we
578 // want to get out of the woods for now.
579 if (offer.valid && offer.sharedMemory != 0) {
580 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
581 return;
582 }
583 ri++;
584 }
585 // Find the first empty offer and have 1GB of shared memory there
586 for (auto& offer : quotaEvaluator.mOffers) {
587 if (offer.valid == false) {
588 offer.cpu = 0;
589 offer.memory = 0;
590 offer.sharedMemory = 1000000000;
591 offer.valid = true;
592 offer.user = -1;
593 break;
594 }
595 }
597 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
598}
599
600static auto toBeForwardedHeader = [](void* header) -> bool {
601 // If is now possible that the record is not complete when
602 // we forward it, because of a custom completion policy.
603 // this means that we need to skip the empty entries in the
604 // record for being forwarded.
605 if (header == nullptr) {
606 return false;
607 }
608 auto sih = o2::header::get<SourceInfoHeader*>(header);
609 if (sih) {
610 return false;
611 }
612
613 auto dih = o2::header::get<DomainInfoHeader*>(header);
614 if (dih) {
615 return false;
616 }
617
618 auto dh = o2::header::get<DataHeader*>(header);
619 if (!dh) {
620 return false;
621 }
622 auto dph = o2::header::get<DataProcessingHeader*>(header);
623 if (!dph) {
624 return false;
625 }
626 return true;
627};
628
629static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
630 FairMQDeviceProxy& proxy,
631 std::unique_ptr<fair::mq::Message>& header,
632 std::unique_ptr<fair::mq::Message>& payload,
633 size_t total,
634 bool consume) {
635 if (header.get() == nullptr) {
636 // Missing an header is not an error anymore.
637 // it simply means that we did not receive the
638 // given input, but we were asked to
639 // consume existing, so we skip it.
640 return false;
641 }
642 if (payload.get() == nullptr && consume == true) {
643 // If the payload is not there, it means we already
644 // processed it with ConsumeExisiting. Therefore we
645 // need to do something only if this is the last consume.
646 header.reset(nullptr);
647 return false;
648 }
649
650 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
651 if (fdph == nullptr) {
652 LOG(error) << "Data is missing DataProcessingHeader";
653 return false;
654 }
655 auto fdh = o2::header::get<DataHeader*>(header->GetData());
656 if (fdh == nullptr) {
657 LOG(error) << "Data is missing DataHeader";
658 return false;
659 }
660
661 // We need to find the forward route only for the first
662 // part of a split payload. All the others will use the same.
663 // but always check if we have a sequence of multiple payloads
664 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
665 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
666 }
667 return cachedForwardingChoices.empty() == false;
668};
669
670struct DecongestionContext {
673};
674
675auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
676 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
677 auto& ref = task.user<DecongestionContext>().ref;
678
679 auto& decongestion = ref.get<DecongestionService>();
680 auto& proxy = ref.get<FairMQDeviceProxy>();
681 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
682 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
683 return;
684 }
685 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
686 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
687 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
688 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
689 // TODO: this we could cache in the proxy at the bind moment.
690 if (info.channelType != ChannelAccountingType::DPL) {
691 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
692 info.name.c_str());
693
694 continue;
695 }
696 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
697 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
698 info.name.c_str(), oldestTimeslice.timeslice.value);
699 }
700 }
701};
702
703// This is how we do the forwarding, i.e. we push
704// the inputs which are shared between this device and others
705// to the next one in the daisy chain.
706// FIXME: do it in a smarter way than O(N^2)
707static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
708 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
709 auto& proxy = registry.get<FairMQDeviceProxy>();
710 // we collect all messages per forward in a map and send them together
711 std::vector<fair::mq::Parts> forwardedParts;
712 forwardedParts.resize(proxy.getNumForwards());
713 std::vector<ChannelIndex> cachedForwardingChoices{};
714 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
715 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
716 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
717
718 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
719 auto& messageSet = currentSetOfInputs[ii];
720 // In case the messageSet is empty, there is nothing to be done.
721 if (messageSet.size() == 0) {
722 continue;
723 }
724 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
725 continue;
726 }
727 cachedForwardingChoices.clear();
728
729 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
730 auto& messageSet = currentSetOfInputs[ii];
731 auto& header = messageSet.header(pi);
732 auto& payload = messageSet.payload(pi);
733 auto total = messageSet.getNumberOfPayloads(pi);
734
735 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
736 continue;
737 }
738
739 // In case of more than one forward route, we need to copy the message.
740 // This will eventually use the same mamory if running with the same backend.
741 if (cachedForwardingChoices.size() > 1) {
742 copy = true;
743 }
744 auto* dh = o2::header::get<DataHeader*>(header->GetData());
745 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
746
747 if (copy) {
748 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
749 auto&& newHeader = header->GetTransport()->CreateMessage();
750 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
751 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
752 newHeader->Copy(*header);
753 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
754
755 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
756 auto&& newPayload = header->GetTransport()->CreateMessage();
757 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
758 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
759 }
760 }
761 } else {
762 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
763 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
764 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
765 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
766 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
767 }
768 }
769 }
770 }
771 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
772 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
773 if (forwardedParts[fi].Size() == 0) {
774 continue;
775 }
776 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
777 auto& parts = forwardedParts[fi];
778 if (info.policy == nullptr) {
779 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
780 continue;
781 }
782 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
783 info.policy->forward(parts, ChannelIndex{fi}, registry);
784 }
785
786 auto& asyncQueue = registry.get<AsyncQueue>();
787 auto& decongestion = registry.get<DecongestionService>();
788 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
789 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
790 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
791 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
792 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
793};
794
795extern volatile int region_read_global_dummy_variable;
797
799void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
800{
801 if (infos.empty() == false) {
802 std::vector<fair::mq::RegionInfo> toBeNotified;
803 toBeNotified.swap(infos); // avoid any MT issue.
804 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
805 for (auto const& info : toBeNotified) {
806 if (dummyRead) {
807 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
808 region_read_global_dummy_variable = ((int*)info.ptr)[i];
809 }
810 }
811 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
812 }
813 }
814}
815
816namespace
817{
819{
820 auto* state = (DeviceState*)handle->data;
822}
823} // namespace
824
825void DataProcessingDevice::initPollers()
826{
827 auto ref = ServiceRegistryRef{mServiceRegistry};
828 auto& deviceContext = ref.get<DeviceContext>();
829 auto& context = ref.get<DataProcessorContext>();
830 auto& spec = ref.get<DeviceSpec const>();
831 auto& state = ref.get<DeviceState>();
832 // We add a timer only in case a channel poller is not there.
833 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
834 for (auto& [channelName, channel] : GetChannels()) {
835 InputChannelInfo* channelInfo;
836 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
837 auto& channelSpec = spec.inputChannels[ci];
838 channelInfo = &state.inputChannelInfos[ci];
839 if (channelSpec.name != channelName) {
840 continue;
841 }
842 channelInfo->channel = &this->GetChannel(channelName, 0);
843 break;
844 }
845 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
846 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
847 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
848 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
849 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
850 continue;
851 }
852 // We only watch receiving sockets.
853 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
854 LOGP(detail, "{} is to send data. Not polling.", channelName);
855 continue;
856 }
857
858 if (channelName.rfind("from_", 0) != 0) {
859 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
860 continue;
861 }
862
863 // We assume there is always a ZeroMQ socket behind.
864 int zmq_fd = 0;
865 size_t zmq_fd_len = sizeof(zmq_fd);
866 // FIXME: I should probably save those somewhere... ;-)
867 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
868 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
869 if (zmq_fd == 0) {
870 LOG(error) << "Cannot get file descriptor for channel." << channelName;
871 continue;
872 }
873 LOGP(detail, "Polling socket for {}", channelName);
874 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
875 pCtx->name = strdup(channelName.c_str());
876 pCtx->loop = state.loop;
877 pCtx->device = this;
878 pCtx->state = &state;
879 pCtx->fd = zmq_fd;
880 assert(channelInfo != nullptr);
881 pCtx->channelInfo = channelInfo;
882 pCtx->socket = &channel[0].GetSocket();
883 pCtx->read = true;
884 poller->data = pCtx;
885 uv_poll_init(state.loop, poller, zmq_fd);
886 if (channelName.rfind("from_", 0) != 0) {
887 LOGP(detail, "{} is an out of band channel.", channelName);
888 state.activeOutOfBandPollers.push_back(poller);
889 } else {
890 channelInfo->pollerIndex = state.activeInputPollers.size();
891 state.activeInputPollers.push_back(poller);
892 }
893 }
894 // In case we do not have any input channel and we do not have
895 // any timers or signal watchers we still wake up whenever we can send data to downstream
896 // devices to allow for enumerations.
897 if (state.activeInputPollers.empty() &&
898 state.activeOutOfBandPollers.empty() &&
899 state.activeTimers.empty() &&
900 state.activeSignals.empty()) {
901 // FIXME: this is to make sure we do not reset the output timer
902 // for readout proxies or similar. In principle this should go once
903 // we move to OutOfBand InputSpec.
904 if (state.inputChannelInfos.empty()) {
905 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
906 deviceContext.exitTransitionTimeout = 0;
907 }
908 for (auto& [channelName, channel] : GetChannels()) {
909 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
910 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
911 continue;
912 }
913 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
914 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
915 continue;
916 }
917 // We assume there is always a ZeroMQ socket behind.
918 int zmq_fd = 0;
919 size_t zmq_fd_len = sizeof(zmq_fd);
920 // FIXME: I should probably save those somewhere... ;-)
921 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
922 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
923 if (zmq_fd == 0) {
924 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
925 continue;
926 }
927 LOG(detail) << "Polling socket for " << channel[0].GetName();
928 // FIXME: leak
929 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
930 pCtx->name = strdup(channelName.c_str());
931 pCtx->loop = state.loop;
932 pCtx->device = this;
933 pCtx->state = &state;
934 pCtx->fd = zmq_fd;
935 pCtx->read = false;
936 poller->data = pCtx;
937 uv_poll_init(state.loop, poller, zmq_fd);
938 state.activeOutputPollers.push_back(poller);
939 }
940 }
941 } else {
942 LOGP(detail, "This is a fake device so we exit after the first iteration.");
943 deviceContext.exitTransitionTimeout = 0;
944 // This is a fake device, so we can request to exit immediately
945 ServiceRegistryRef ref{mServiceRegistry};
946 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
947 // A two second timer to stop internal devices which do not want to
948 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
949 uv_timer_init(state.loop, timer);
950 timer->data = &state;
951 uv_update_time(state.loop);
952 uv_timer_start(timer, on_idle_timer, 2000, 2000);
953 state.activeTimers.push_back(timer);
954 }
955}
956
957void DataProcessingDevice::startPollers()
958{
959 auto ref = ServiceRegistryRef{mServiceRegistry};
960 auto& deviceContext = ref.get<DeviceContext>();
961 auto& state = ref.get<DeviceState>();
962
963 for (auto* poller : state.activeInputPollers) {
964 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
965 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
966 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
967 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
968 }
969 for (auto& poller : state.activeOutOfBandPollers) {
970 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
971 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
972 }
973 for (auto* poller : state.activeOutputPollers) {
974 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
975 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
976 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
977 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
978 }
979
980 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
981 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
982 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
983
984 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
985 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
986 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
987}
988
989void DataProcessingDevice::stopPollers()
990{
991 auto ref = ServiceRegistryRef{mServiceRegistry};
992 auto& deviceContext = ref.get<DeviceContext>();
993 auto& state = ref.get<DeviceState>();
994 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
995 for (auto* poller : state.activeInputPollers) {
996 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
997 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
998 uv_poll_stop(poller);
999 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1000 }
1001 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
1002 for (auto* poller : state.activeOutOfBandPollers) {
1003 uv_poll_stop(poller);
1004 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1005 }
1006 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
1007 for (auto* poller : state.activeOutputPollers) {
1008 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
1009 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
1010 uv_poll_stop(poller);
1011 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
1012 }
1013
1014 uv_timer_stop(deviceContext.gracePeriodTimer);
1015 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
1016 free(deviceContext.gracePeriodTimer);
1017 deviceContext.gracePeriodTimer = nullptr;
1018
1019 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
1020 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
1021 free(deviceContext.dataProcessingGracePeriodTimer);
1022 deviceContext.dataProcessingGracePeriodTimer = nullptr;
1023}
1024
1026{
1027 auto ref = ServiceRegistryRef{mServiceRegistry};
1028 auto& deviceContext = ref.get<DeviceContext>();
1029 auto& context = ref.get<DataProcessorContext>();
1030
1031 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1032 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
1033 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
1034 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
1035 auto& state = ref.get<DeviceState>();
1036 int i = 0;
1037 for (auto& di : distinct) {
1038 auto& route = spec.inputs[di];
1039 if (route.configurator.has_value() == false) {
1040 i++;
1041 continue;
1042 }
1043 ExpirationHandler handler{
1044 .name = route.configurator->name,
1045 .routeIndex = RouteIndex{i++},
1046 .lifetime = route.matcher.lifetime,
1047 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1048 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1049 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1050 context.expirationHandlers.emplace_back(std::move(handler));
1051 }
1052
1053 if (state.awakeMainThread == nullptr) {
1054 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1055 state.awakeMainThread->data = &state;
1056 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1057 }
1058
1059 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1060 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1061 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1062
1063 for (auto& channel : GetChannels()) {
1064 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1065 &registry = mServiceRegistry,
1066 &pendingRegionInfos = mPendingRegionInfos,
1067 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1068 std::lock_guard<std::mutex> lock(regionInfoMutex);
1069 LOG(detail) << ">>> Region info event" << info.event;
1070 LOG(detail) << "id: " << info.id;
1071 LOG(detail) << "ptr: " << info.ptr;
1072 LOG(detail) << "size: " << info.size;
1073 LOG(detail) << "flags: " << info.flags;
1074 // Now we check for pending events with the mutex,
1075 // so the lines below are atomic.
1076 pendingRegionInfos.push_back(info);
1077 context.expectedRegionCallbacks -= 1;
1078 // We always want to handle these on the main loop,
1079 // so we awake it.
1080 ServiceRegistryRef ref{registry};
1081 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1082 });
1083 }
1084
1085 // Add a signal manager for SIGUSR1 so that we can force
1086 // an event from the outside, making sure that the event loop can
1087 // be unblocked (e.g. by a quitting DPL driver) even when there
1088 // is no data pending to be processed.
1089 if (deviceContext.sigusr1Handle == nullptr) {
1090 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1091 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1092 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1093 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1094 }
1095 // If there is any signal, we want to make sure they are active
1096 for (auto& handle : state.activeSignals) {
1097 handle->data = &state;
1098 }
1099 // When we start, we must make sure that we do listen to the signal
1100 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1101
1103 DataProcessingDevice::initPollers();
1104
1105 // Whenever we InitTask, we consider as if the previous iteration
1106 // was successful, so that even if there is no timer or receiving
1107 // channel, we can still start an enumeration.
1108 DataProcessorContext* initialContext = nullptr;
1109 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1110 if (!idle) {
1111 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1112 }
1113
1114 // We should be ready to run here. Therefore we copy all the
1115 // required parts in the DataProcessorContext. Eventually we should
1116 // do so on a per thread basis, with fine grained locks.
1117 // FIXME: this should not use ServiceRegistry::threadSalt, but
1118 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1119 // N is the number of the multiplexed data processor.
1120 // We will get there.
1121 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1122
1123 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1124
1125 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1126 std::lock_guard<std::mutex> lock(mutex);
1127 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1128 };
1129 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1134 while (hasPendingEvents(deviceContext)) {
1135 // Wait for the callback to signal its done, so that we do not busy wait.
1136 uv_run(state.loop, UV_RUN_ONCE);
1137 // Handle callbacks if any
1138 {
1139 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1140 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1141 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1142 }
1143 }
1144 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1145}
1146
1148{
1149 context.isSink = false;
1150 // If nothing is a sink, the rate limiting simply does not trigger.
1151 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1152
1153 auto ref = ServiceRegistryRef{mServiceRegistry};
1154 auto& spec = ref.get<DeviceSpec const>();
1155
1156 // The policy is now allowed to state the default.
1157 context.balancingInputs = spec.completionPolicy.balanceChannels;
1158 // This is needed because the internal injected dummy sink should not
1159 // try to balance inputs unless the rate limiting is requested.
1160 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1161 context.balancingInputs = false;
1162 }
1163 if (enableRateLimiting) {
1164 for (auto& spec : spec.outputs) {
1165 if (spec.matcher.binding.value == "dpl-summary") {
1166 context.isSink = true;
1167 break;
1168 }
1169 }
1170 }
1171
1172 context.registry = &mServiceRegistry;
1175 if (context.error != nullptr) {
1176 context.errorHandling = [&errorCallback = context.error,
1177 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1181 auto& err = error_from_ref(e);
1182 auto& context = ref.get<DataProcessorContext>();
1183 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1184 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1185 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1186 auto& stats = ref.get<DataProcessingStats>();
1188 ErrorContext errorContext{record, ref, e};
1189 errorCallback(errorContext);
1190 };
1191 } else {
1192 context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
1193 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1194 auto& err = error_from_ref(e);
1198 auto& context = ref.get<DataProcessorContext>();
1199 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1200 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1201 auto& stats = ref.get<DataProcessingStats>();
1203 switch (errorPolicy) {
1205 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1206 throw e;
1207 default:
1208 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1209 break;
1210 }
1211 };
1212 }
1213
1214 auto decideEarlyForward = [&context, &spec, this]() -> bool {
1217 bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1218 bool onlyConditions = true;
1219 bool overriddenEarlyForward = false;
1220 for (auto& forwarded : spec.forwards) {
1221 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1222 onlyConditions = false;
1223 }
1224#if !__has_include(<fairmq/shmem/Message.h>)
1225 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1226 context.canForwardEarly = false;
1227 overriddenEarlyForward = true;
1228 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1229 break;
1230 }
1231#endif
1232 if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
1233 context.canForwardEarly = false;
1234 overriddenEarlyForward = true;
1235 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1236 break;
1237 }
1238 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1239 context.canForwardEarly = false;
1240 overriddenEarlyForward = true;
1241 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1242 break;
1243 }
1244 }
1245 if (!overriddenEarlyForward && onlyConditions) {
1246 context.canForwardEarly = true;
1247 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1248 }
1249 return canForwardEarly;
1250 };
1251 context.canForwardEarly = decideEarlyForward();
1252}
1253
1255{
1256 auto ref = ServiceRegistryRef{mServiceRegistry};
1257 auto& state = ref.get<DeviceState>();
1258
1259 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1260 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1261 state.quitRequested = false;
1263 state.allowedProcessing = DeviceState::Any;
1264 for (auto& info : state.inputChannelInfos) {
1265 if (info.state != InputChannelState::Pull) {
1266 info.state = InputChannelState::Running;
1267 }
1268 }
1269
1270 // Catch callbacks which fail before we start.
1271 // Notice that when running multiple dataprocessors
1272 // we should probably allow expendable ones to fail.
1273 try {
1274 auto& dpContext = ref.get<DataProcessorContext>();
1275 dpContext.preStartCallbacks(ref);
1276 for (size_t i = 0; i < mStreams.size(); ++i) {
1277 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1278 auto& context = streamRef.get<StreamContext>();
1279 context.preStartStreamCallbacks(streamRef);
1280 }
1281 } catch (std::exception& e) {
1282 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1283 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1284 throw;
1285 } catch (o2::framework::RuntimeErrorRef& e) {
1286 auto& err = error_from_ref(e);
1287 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1288 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1289 throw;
1290 } catch (...) {
1291 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1292 throw;
1293 }
1294
1295 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1296 startPollers();
1297
1298 // Raise to 1 when we are ready to start processing
1299 using o2::monitoring::Metric;
1300 using o2::monitoring::Monitoring;
1301 using o2::monitoring::tags::Key;
1302 using o2::monitoring::tags::Value;
1303
1304 auto& monitoring = ref.get<Monitoring>();
1305 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1306 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1307}
1308
1310{
1311 ServiceRegistryRef ref{mServiceRegistry};
1312 // Raise to 1 when we are ready to start processing
1313 using o2::monitoring::Metric;
1314 using o2::monitoring::Monitoring;
1315 using o2::monitoring::tags::Key;
1316 using o2::monitoring::tags::Value;
1317
1318 auto& monitoring = ref.get<Monitoring>();
1319 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1320
1321 stopPollers();
1322 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1323 auto& dpContext = ref.get<DataProcessorContext>();
1324 dpContext.postStopCallbacks(ref);
1325}
1326
1328{
1329 ServiceRegistryRef ref{mServiceRegistry};
1330 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1331}
1332
1334{
1335 ServiceRegistryRef ref{mServiceRegistry};
1336 auto& state = ref.get<DeviceState>();
1338 bool firstLoop = true;
1339 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1340 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1341
1342 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1343 if (dplEnableMultithreding) {
1344 setenv("UV_THREADPOOL_SIZE", "1", 1);
1345 }
1346
1347 while (state.transitionHandling != TransitionHandlingState::Expired) {
1348 if (state.nextFairMQState.empty() == false) {
1349 (void)this->ChangeState(state.nextFairMQState.back());
1350 state.nextFairMQState.pop_back();
1351 }
1352 // Notify on the main thread the new region callbacks, making sure
1353 // no callback is issued if there is something still processing.
1354 {
1355 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1356 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1357 }
1358 // This will block for the correct delay (or until we get data
1359 // on a socket). We also do not block on the first iteration
1360 // so that devices which do not have a timer can still start an
1361 // enumeration.
1362 {
1363 ServiceRegistryRef ref{mServiceRegistry};
1364 ref.get<DriverClient>().flushPending(mServiceRegistry);
1365 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1366 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1367 // In such case we will take care of it at next iteration.
1368 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1369
1370 auto shouldNotWait = (lastActive != nullptr &&
1371 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1373 if (firstLoop) {
1374 shouldNotWait = true;
1375 firstLoop = false;
1376 }
1377 if (lastActive != nullptr) {
1379 }
1380 if (NewStatePending()) {
1381 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1382 shouldNotWait = true;
1384 }
1385 if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
1386 state.transitionHandling = TransitionHandlingState::Requested;
1387 auto& deviceContext = ref.get<DeviceContext>();
1388 // Check if we only have timers
1389 auto& spec = ref.get<DeviceSpec const>();
1390 if (hasOnlyTimers(spec)) {
1392 }
1393
1394 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
1395 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
1396 uv_update_time(state.loop);
1397 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
1398 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
1399 }
1400 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
1401 state.transitionHandling = TransitionHandlingState::Requested;
1402 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
1403 uv_update_time(state.loop);
1404 O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
1405 deviceContext.exitTransitionTimeout);
1406 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
1407 bool onlyGenerated = hasOnlyGenerated(spec);
1408 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
1409 if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
1410 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
1411 } else {
1412 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
1413 "New state requested. Waiting for %d seconds before %{public}s",
1414 timeout,
1415 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
1416 }
1417 } else {
1418 state.transitionHandling = TransitionHandlingState::Expired;
1419 if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1420 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
1421 } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
1422 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
1423 } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
1424 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
1425 } else {
1426 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
1427 }
1428 }
1429 }
1430 // If we are Idle, we can then consider the transition to be expired.
1431 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1432 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1433 state.transitionHandling = TransitionHandlingState::Expired;
1434 }
1435 if (state.severityStack.empty() == false) {
1436 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1437 state.severityStack.pop_back();
1438 }
1439 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1440 // shouldNotWait |= info.readPolled;
1441 // }
1442 state.loopReason = DeviceState::NO_REASON;
1443 state.firedTimers.clear();
1444 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1445 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1446 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1447 }
1448 // Run the asynchronous queue just before sleeping again, so that:
1449 // - we can trigger further events from the queue
1450 // - we can guarantee this is the last thing we do in the loop (
1451 // assuming no one else is adding to the queue before this point).
1452 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1453 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1454 ServiceRegistryRef ref{registry};
1455 ref.get<AsyncQueue>();
1456 ref.get<DecongestionService>();
1457 ref.get<DataRelayer>();
1458 // Get the current timeslice for the slot.
1459 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1461 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1462 };
1463 auto& relayer = ref.get<DataRelayer>();
1464 relayer.prunePending(onDrop);
1465 auto& queue = ref.get<AsyncQueue>();
1466 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1467 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1468 if (shouldNotWait == false) {
1469 auto& dpContext = ref.get<DataProcessorContext>();
1470 dpContext.preLoopCallbacks(ref);
1471 }
1472 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1473 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1474 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1475 if ((state.loopReason & state.tracingFlags) != 0) {
1476 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1477 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1478 } else if (state.severityStack.empty() == false) {
1479 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1480 state.severityStack.pop_back();
1481 }
1482 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1483
1484 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1485 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1486 relayer.rescan();
1487 }
1488
1489 if (!state.pendingOffers.empty()) {
1490 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1491 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1492 }
1493 }
1494
1495 // Notify on the main thread the new region callbacks, making sure
1496 // no callback is issued if there is something still processing.
1497 // Notice that we still need to perform callbacks also after
1498 // the socket epolled, because otherwise we would end up serving
1499 // the callback after the first data arrives is the system is too
1500 // fast to transition from Init to Run.
1501 {
1502 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1503 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1504 }
1505
1506 assert(mStreams.size() == mHandles.size());
1508 TaskStreamRef streamRef{-1};
1509 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1510 auto& taskInfo = mStreams[ti];
1511 if (taskInfo.running) {
1512 continue;
1513 }
1514 // Stream 0 is for when we run in
1515 streamRef.index = ti;
1516 }
1517 using o2::monitoring::Metric;
1518 using o2::monitoring::Monitoring;
1519 using o2::monitoring::tags::Key;
1520 using o2::monitoring::tags::Value;
1521 // We have an empty stream, let's check if we have enough
1522 // resources for it to run something
1523 if (streamRef.index != -1) {
1524 // Synchronous execution of the callbacks. This will be moved in the
1525 // moved in the on_socket_polled once we have threading in place.
1526 uv_work_t& handle = mHandles[streamRef.index];
1527 TaskStreamInfo& stream = mStreams[streamRef.index];
1528 handle.data = &mStreams[streamRef.index];
1529
1530 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1531 ServiceRegistryRef ref{registry};
1532 auto& dpStats = ref.get<DataProcessingStats>();
1533 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1534 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1535 dpStats.processCommandQueue();
1536 };
1537 auto ref = ServiceRegistryRef{mServiceRegistry};
1538
1539 // Deciding wether to run or not can be done by passing a request to
1540 // the evaluator. In this case, the request is always satisfied and
1541 // we run on whatever resource is available.
1542 auto& spec = ref.get<DeviceSpec const>();
1543 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1544
1545 if (enough) {
1546 stream.id = streamRef;
1547 stream.running = true;
1548 stream.registry = &mServiceRegistry;
1549 if (dplEnableMultithreding) [[unlikely]] {
1550 stream.task = &handle;
1551 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1552 } else {
1553 run_callback(&handle);
1554 run_completion(&handle, 0);
1555 }
1556 } else {
1557 auto ref = ServiceRegistryRef{mServiceRegistry};
1558 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1559 }
1560 }
1561 }
1562
1563 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
1564 auto& spec = ref.get<DeviceSpec const>();
1566 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1567 auto& info = state.inputChannelInfos[ci];
1568 info.parts.fParts.clear();
1569 }
1570 state.transitionHandling = TransitionHandlingState::NoTransition;
1571}
1572
1576{
1577 auto& context = ref.get<DataProcessorContext>();
1578 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1579 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1580
1581 {
1582 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1583 }
1584 // Whether or not we had something to do.
1585
1586 // Initialise the value for context.allDone. It will possibly be updated
1587 // below if any of the channels is not done.
1588 //
1589 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1590 // expect to receive an EndOfStream signal. Thus we do not wait for these
1591 // to be completed. In the case of data source devices, as they do not have
1592 // real data input channels, they have to signal EndOfStream themselves.
1593 auto& state = ref.get<DeviceState>();
1594 auto& spec = ref.get<DeviceSpec const>();
1595 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1596 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1597 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1598 if (info.channel) {
1599 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1600 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1601 } else {
1602 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1603 }
1604 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1605 });
1606 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1607 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1610 static std::vector<int> pollOrder;
1611 pollOrder.resize(state.inputChannelInfos.size());
1612 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1613 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1614 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1615 });
1616
1617 // Nothing to poll...
1618 if (pollOrder.empty()) {
1619 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1620 return;
1621 }
1622 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1623 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1624 auto delta = currentNewest.value - currentOldest.value;
1625 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1626 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1627 auto& infos = state.inputChannelInfos;
1628
1629 if (context.balancingInputs) {
1630 static int pipelineLength = DefaultsHelpers::pipelineLength();
1631 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1632 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1633 return infos[a].oldestForChannel.value > limitNew;
1634 });
1635 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1636 const auto& channelInfo = state.inputChannelInfos[*it];
1637 if (channelInfo.pollerIndex != -1) {
1638 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1639 auto& pollerContext = *(PollerContext*)(poller->data);
1640 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1641 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1642 bool shouldBeRunning = it < newEnd;
1643 if (running != shouldBeRunning) {
1644 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1645 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1646 }
1647 }
1648 }
1649 }
1650 pollOrder.erase(newEnd, pollOrder.end());
1651 }
1652 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1653
1654 for (auto sci : pollOrder) {
1655 auto& info = state.inputChannelInfos[sci];
1656 auto& channelSpec = spec.inputChannels[sci];
1657 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1658 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1659
1660 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1661 context.allDone = false;
1662 }
1663 if (info.state != InputChannelState::Running) {
1664 // Remember to flush data if we are not running
1665 // and there is some message pending.
1666 if (info.parts.Size()) {
1668 }
1669 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1670 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1671 continue;
1672 }
1673 if (info.channel == nullptr) {
1674 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1675 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1676 continue;
1677 }
1678 // Only poll DPL channels for now.
1680 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1681 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1682 continue;
1683 }
1684 auto& socket = info.channel->GetSocket();
1685 // If we have pending events from a previous iteration,
1686 // we do receive in any case.
1687 // Otherwise we check if there is any pending event and skip
1688 // this channel in case there is none.
1689 if (info.hasPendingEvents == 0) {
1690 socket.Events(&info.hasPendingEvents);
1691 // If we do not read, we can continue.
1692 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1693 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1694 continue;
1695 }
1696 }
1697 // We can reset this, because it means we have seen at least 1
1698 // message after the UV_READABLE was raised.
1699 info.readPolled = false;
1700 // Notice that there seems to be a difference between the documentation
1701 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1702 // is raised does not mean that a message is immediately available to
1703 // read, just that it will be available soon, so the receive can
1704 // still return -2. To avoid this we keep receiving on the socket until
1705 // we get a message. In order not to overflow the DPL queue we process
1706 // one message at the time and we keep track of wether there were more
1707 // to process.
1708 bool newMessages = false;
1709 while (true) {
1710 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1711 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1712 if (info.parts.Size() < 64) {
1713 fair::mq::Parts parts;
1714 info.channel->Receive(parts, 0);
1715 if (parts.Size()) {
1716 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1717 }
1718 for (auto&& part : parts) {
1719 info.parts.fParts.emplace_back(std::move(part));
1720 }
1721 newMessages |= true;
1722 }
1723
1724 if (info.parts.Size() >= 0) {
1726 // Receiving data counts as activity now, so that
1727 // We can make sure we process all the pending
1728 // messages without hanging on the uv_run.
1729 break;
1730 }
1731 }
1732 // We check once again for pending events, keeping track if this was the
1733 // case so that we can immediately repeat this loop and avoid remaining
1734 // stuck in uv_run. This is because we will not get notified on the socket
1735 // if more events are pending due to zeromq level triggered approach.
1736 socket.Events(&info.hasPendingEvents);
1737 if (info.hasPendingEvents) {
1738 info.readPolled = false;
1739 // In case there were messages, we consider it as activity
1740 if (newMessages) {
1741 state.lastActiveDataProcessor.store(&context);
1742 }
1743 }
1744 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1745 channelSpec.name.c_str(), info.id.value);
1746 }
1747}
1748
1750{
1751 auto& context = ref.get<DataProcessorContext>();
1752 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1753 auto& state = ref.get<DeviceState>();
1754 auto& spec = ref.get<DeviceSpec const>();
1755
1756 if (state.streaming == StreamingState::Idle) {
1757 return;
1758 }
1759
1760 context.completed.clear();
1761 context.completed.reserve(16);
1762 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1763 state.lastActiveDataProcessor.store(&context);
1764 }
1765 DanglingContext danglingContext{*context.registry};
1766
1767 context.preDanglingCallbacks(danglingContext);
1768 if (state.lastActiveDataProcessor.load() == nullptr) {
1769 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1770 }
1771 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1772 if (activity.expiredSlots > 0) {
1773 state.lastActiveDataProcessor = &context;
1774 }
1775
1776 context.completed.clear();
1777 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1778 state.lastActiveDataProcessor = &context;
1779 }
1780
1781 context.postDanglingCallbacks(danglingContext);
1782
1783 // If we got notified that all the sources are done, we call the EndOfStream
1784 // callback and return false. Notice that what happens next is actually
1785 // dependent on the callback, not something which is controlled by the
1786 // framework itself.
1787 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1789 state.lastActiveDataProcessor = &context;
1790 }
1791
1792 if (state.streaming == StreamingState::EndOfStreaming) {
1793 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1794 // We keep processing data until we are Idle.
1795 // FIXME: not sure this is the correct way to drain the queues, but
1796 // I guess we will see.
1799 auto& relayer = ref.get<DataRelayer>();
1800
1801 bool shouldProcess = hasOnlyGenerated(spec) == false;
1802
1803 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1804 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1805 }
1806
1807 auto& timingInfo = ref.get<TimingInfo>();
1808 // We should keep the data generated at end of stream only for those
1809 // which are not sources.
1810 timingInfo.keepAtEndOfStream = shouldProcess;
1811 // Fill timinginfo with some reasonable values for data sent with endOfStream
1812 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1813 timingInfo.tfCounter = -1;
1814 timingInfo.firstTForbit = -1;
1815 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1816 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1817 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1818
1819 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1820
1821 context.preEOSCallbacks(eosContext);
1822 auto& streamContext = ref.get<StreamContext>();
1823 streamContext.preEOSCallbacks(eosContext);
1824 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1825 streamContext.postEOSCallbacks(eosContext);
1826 context.postEOSCallbacks(eosContext);
1827
1828 for (auto& channel : spec.outputChannels) {
1829 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1831 }
1832 // This is needed because the transport is deleted before the device.
1833 relayer.clear();
1835 // In case we should process, note the data processor responsible for it
1836 if (shouldProcess) {
1837 state.lastActiveDataProcessor = &context;
1838 }
1839 // On end of stream we shut down all output pollers.
1840 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1841 for (auto& poller : state.activeOutputPollers) {
1842 uv_poll_stop(poller);
1843 }
1844 return;
1845 }
1846
1847 if (state.streaming == StreamingState::Idle) {
1848 // On end of stream we shut down all output pollers.
1849 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1850 for (auto& poller : state.activeOutputPollers) {
1851 uv_poll_stop(poller);
1852 }
1853 }
1854
1855 return;
1856}
1857
1859{
1860 ServiceRegistryRef ref{mServiceRegistry};
1861 ref.get<DataRelayer>().clear();
1862 auto& deviceContext = ref.get<DeviceContext>();
1863 // If the signal handler is there, we should
1864 // hide the registry from it, so that we do not
1865 // end up calling the signal handler on something
1866 // which might not be there anymore.
1867 if (deviceContext.sigusr1Handle) {
1868 deviceContext.sigusr1Handle->data = nullptr;
1869 }
1870 // Makes sure we do not have a working context on
1871 // shutdown.
1872 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1873 handle->data = nullptr;
1874 }
1875}
1876
1879 {
1880 }
1881};
1882
1888{
1889 using InputInfo = DataRelayer::InputInfo;
1891
1892 auto& context = ref.get<DataProcessorContext>();
1893 // This is the same id as the upper level function, so we get the events
1894 // associated with the same interval. We will simply use "handle_data" as
1895 // the category.
1896 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1897
1898 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1899 // and we do a few stats. We bind parts as a lambda captured variable, rather
1900 // than an input, because we do not want the outer loop actually be exposed
1901 // to the implementation details of the messaging layer.
1902 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1903 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1904 auto ref = ServiceRegistryRef{*context.registry};
1905 auto& stats = ref.get<DataProcessingStats>();
1906 auto& state = ref.get<DeviceState>();
1907 auto& parts = info.parts;
1908 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1909
1910 std::vector<InputInfo> results;
1911 // we can reserve the upper limit
1912 results.reserve(parts.Size() / 2);
1913 size_t nTotalPayloads = 0;
1914
1915 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1916 results.emplace_back(position, length, type, index);
1917 if (type != InputType::Invalid && length > 1) {
1918 nTotalPayloads += length - 1;
1919 }
1920 };
1921
1922 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1923 auto* headerData = parts.At(pi)->GetData();
1924 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1925 if (sih) {
1926 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1927 info.state = sih->state;
1928 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1929 state.lastActiveDataProcessor = &context;
1930 continue;
1931 }
1932 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1933 if (dih) {
1934 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1935 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1936 state.lastActiveDataProcessor = &context;
1937 continue;
1938 }
1939 auto dh = o2::header::get<DataHeader*>(headerData);
1940 if (!dh) {
1941 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1942 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1943 continue;
1944 }
1945 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1946 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1947 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1948 continue;
1949 }
1950 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1951 // We only deal with the tracking of parts if the log is enabled.
1952 // This is because in principle we should track the size of each of
1953 // the parts and sum it up. Not for now.
1954 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1955 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1956 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1957 if (!dph) {
1958 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1959 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1960 continue;
1961 }
1962 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1963 // this is indicating a sequence of payloads following the header
1964 // FIXME: we will probably also set the DataHeader version
1965 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1966 pi += dh->splitPayloadParts - 1;
1967 } else {
1968 // We can set the type for the next splitPayloadParts
1969 // because we are guaranteed they are all the same.
1970 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1971 // pair.
1972 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1973 if (finalSplitPayloadIndex > parts.Size()) {
1974 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1975 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1976 continue;
1977 }
1978 insertInputInfo(pi, 2, InputType::Data, info.id);
1979 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1980 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1981 }
1982 }
1983 }
1984 if (results.size() + nTotalPayloads != parts.Size()) {
1985 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1986 return std::nullopt;
1987 }
1988 return results;
1989 };
1990
1991 auto reportError = [ref](const char* message) {
1992 auto& stats = ref.get<DataProcessingStats>();
1994 };
1995
1996 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1997 auto& relayer = ref.get<DataRelayer>();
1998 auto& state = ref.get<DeviceState>();
1999 static WaitBackpressurePolicy policy;
2000 auto& parts = info.parts;
2001 // We relay execution to make sure we have a complete set of parts
2002 // available.
2003 bool hasBackpressure = false;
2004 size_t minBackpressureTimeslice = -1;
2005 bool hasData = false;
2006 size_t oldestPossibleTimeslice = -1;
2007 static std::vector<int> ordering;
2008 // Same as inputInfos but with iota.
2009 ordering.resize(inputInfos.size());
2010 std::iota(ordering.begin(), ordering.end(), 0);
2011 // stable sort orderings by type and position
2012 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
2013 auto const& ai = inputInfos[a];
2014 auto const& bi = inputInfos[b];
2015 if (ai.type != bi.type) {
2016 return ai.type < bi.type;
2017 }
2018 return ai.position < bi.position;
2019 });
2020 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
2021 auto const& input = inputInfos[ordering[ii]];
2022 switch (input.type) {
2023 case InputType::Data: {
2024 hasData = true;
2025 auto headerIndex = input.position;
2026 auto nMessages = 0;
2027 auto nPayloadsPerHeader = 0;
2028 if (input.size > 2) {
2029 // header and multiple payload sequence
2030 nMessages = input.size;
2031 nPayloadsPerHeader = nMessages - 1;
2032 } else {
2033 // multiple header-payload pairs
2034 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
2035 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
2036 nPayloadsPerHeader = 1;
2037 ii += (nMessages / 2) - 1;
2038 }
2039 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
2040 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
2041 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
2042 slot.index, oldestOutputInfo.timeslice.value);
2043 ref.get<AsyncQueue>();
2044 ref.get<DecongestionService>();
2045 ref.get<DataRelayer>();
2046 // Get the current timeslice for the slot.
2047 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
2049 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
2050 };
2051 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
2052 &parts.At(headerIndex),
2053 input,
2054 nMessages,
2055 nPayloadsPerHeader,
2056 onDrop);
2057 switch (relayed.type) {
2059 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
2060 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
2061 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2062 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2063 info.backpressureNotified = true;
2064 info.normalOpsNotified = false;
2065 }
2066 policy.backpressure(info);
2067 hasBackpressure = true;
2068 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2069 break;
2073 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2074 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2075 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2076 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2077 info.normalOpsNotified = true;
2078 info.backpressureNotified = false;
2079 }
2080 break;
2081 }
2082 } break;
2083 case InputType::SourceInfo: {
2084 LOGP(detail, "Received SourceInfo");
2085 auto& context = ref.get<DataProcessorContext>();
2086 state.lastActiveDataProcessor = &context;
2087 auto headerIndex = input.position;
2088 auto payloadIndex = input.position + 1;
2089 assert(payloadIndex < parts.Size());
2090 // FIXME: the message with the end of stream cannot contain
2091 // split parts.
2092 parts.At(headerIndex).reset(nullptr);
2093 parts.At(payloadIndex).reset(nullptr);
2094 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2095 // parts.At(headerIndex + 1 + i).reset(nullptr);
2096 // }
2097 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2098
2099 } break;
2100 case InputType::DomainInfo: {
2103 auto& context = ref.get<DataProcessorContext>();
2104 state.lastActiveDataProcessor = &context;
2105 auto headerIndex = input.position;
2106 auto payloadIndex = input.position + 1;
2107 assert(payloadIndex < parts.Size());
2108 // FIXME: the message with the end of stream cannot contain
2109 // split parts.
2110
2111 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2112 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2113 break;
2114 }
2115 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2116 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2117 parts.At(headerIndex).reset(nullptr);
2118 parts.At(payloadIndex).reset(nullptr);
2119 } break;
2120 case InputType::Invalid: {
2121 reportError("Invalid part found.");
2122 } break;
2123 }
2124 }
2127 if (oldestPossibleTimeslice != (size_t)-1) {
2128 info.oldestForChannel = {oldestPossibleTimeslice};
2129 auto& context = ref.get<DataProcessorContext>();
2130 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2131 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2132 state.lastActiveDataProcessor = &context;
2133 }
2134 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2135 parts.fParts.erase(it, parts.end());
2136 if (parts.fParts.size()) {
2137 LOG(debug) << parts.fParts.size() << " messages backpressured";
2138 }
2139 };
2140
2141 // Second part. This is the actual outer loop we want to obtain, with
2142 // implementation details which can be read. Notice how most of the state
2143 // is actually hidden. For example we do not expose what "input" is. This
2144 // will allow us to keep the same toplevel logic even if the actual meaning
2145 // of input is changed (for example we might move away from multipart
2146 // messages). Notice also that we need to act diffently depending on the
2147 // actual CompletionOp we want to perform. In particular forwarding inputs
2148 // also gets rid of them from the cache.
2149 auto inputTypes = getInputTypes();
2150 if (bool(inputTypes) == false) {
2151 reportError("Parts should come in couples. Dropping it.");
2152 return;
2153 }
2154 handleValidMessages(*inputTypes);
2155 return;
2156}
2157
2158namespace
2159{
2160struct InputLatency {
2161 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2162 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2163};
2164
2165auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2166{
2167 InputLatency result;
2168
2169 for (auto& item : record) {
2170 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2171 if (header == nullptr) {
2172 continue;
2173 }
2174 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2175 if (partLatency < 0) {
2176 partLatency = 0;
2177 }
2178 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2179 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2180 }
2181 return result;
2182};
2183
2184auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2185{
2186 size_t totalInputSize = 0;
2187 for (auto& item : record) {
2188 auto* header = o2::header::get<DataHeader*>(item.header);
2189 if (header == nullptr) {
2190 continue;
2191 }
2192 totalInputSize += header->payloadSize;
2193 }
2194 return totalInputSize;
2195};
2196
2197template <typename T>
2198void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2199{
2200 T prev_value = maximum_value;
2201 while (prev_value < value &&
2202 !maximum_value.compare_exchange_weak(prev_value, value)) {
2203 }
2204}
2205} // namespace
2206
2207bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2208{
2209 auto& context = ref.get<DataProcessorContext>();
2210 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2211 // This is the actual hidden state for the outer loop. In case we decide we
2212 // want to support multithreaded dispatching of operations, I can simply
2213 // move these to some thread local store and the rest of the lambdas
2214 // should work just fine.
2215 std::vector<MessageSet> currentSetOfInputs;
2216
2217 //
2218 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2219 auto& relayer = ref.get<DataRelayer>();
2220 if (consume) {
2221 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2222 } else {
2223 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2224 }
2225 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2226 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2227 const char* headerptr = nullptr;
2228 const char* payloadptr = nullptr;
2229 size_t payloadSize = 0;
2230 // - each input can have multiple parts
2231 // - "part" denotes a sequence of messages belonging together, the first message of the
2232 // sequence is the header message
2233 // - each part has one or more payload messages
2234 // - InputRecord provides all payloads as header-payload pair
2235 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2236 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2237 headerptr = static_cast<char const*>(headerMsg->GetData());
2238 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2239 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2240 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2241 }
2242 return DataRef{};
2243 };
2244 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2245 return currentSetOfInputs[i].getNumberOfPairs();
2246 };
2247#if __has_include(<fairmq/shmem/Message.h>)
2248 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2249 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2250 return header.GetRefCount();
2251 };
2252#else
2253 std::function<int(size_t)> refCountGetter = nullptr;
2254#endif
2255 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2256 };
2257
2258 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2259 auto& relayer = ref.get<DataRelayer>();
2261 };
2262
2263 // I need a preparation step which gets the current timeslice id and
2264 // propagates it to the various contextes (i.e. the actual entities which
2265 // create messages) because the messages need to have the timeslice id into
2266 // it.
2267 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2268 auto& relayer = ref.get<DataRelayer>();
2269 auto& timingInfo = ref.get<TimingInfo>();
2270 auto timeslice = relayer.getTimesliceForSlot(i);
2271
2272 timingInfo.timeslice = timeslice.value;
2273 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2274 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2275 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2276 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2277 };
2278 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2279 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2280 auto& relayer = ref.get<DataRelayer>();
2281 auto& timingInfo = ref.get<TimingInfo>();
2282 auto timeslice = relayer.getTimesliceForSlot(i);
2283 // We report wether or not this timing info refers to a new Run.
2284 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2285 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2286 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2287 // FIXME: for now there is only one stream, however we
2288 // should calculate this correctly once we finally get the
2289 // the StreamContext in.
2290 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2291 };
2292
2293 // When processing them, timers will have to be cleaned up
2294 // to avoid double counting them.
2295 // This was actually the easiest solution we could find for
2296 // O2-646.
2297 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2298 assert(record.size() == currentSetOfInputs.size());
2299 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2300 // assuming that for timer inputs we do have exactly one PartRef object
2301 // in the MessageSet, multiple PartRef Objects are only possible for either
2302 // split payload messages of wildcard matchers, both for data inputs
2303 DataRef input = record.getByPos(ii);
2304 if (input.spec->lifetime != Lifetime::Timer) {
2305 continue;
2306 }
2307 if (input.header == nullptr) {
2308 continue;
2309 }
2310 // This will hopefully delete the message.
2311 currentSetOfInputs[ii].clear();
2312 }
2313 };
2314
2315 // Function to cleanup record. For the moment we
2316 // simply use it to keep track of input messages
2317 // which are not needed, to display them in the GUI.
2318 auto cleanupRecord = [](InputRecord& record) {
2319 if (O2_LOG_ENABLED(parts) == false) {
2320 return;
2321 }
2322 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2323 DataRef input = record.getByPos(pi);
2324 if (input.header == nullptr) {
2325 continue;
2326 }
2327 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2328 if (sih) {
2329 continue;
2330 }
2331
2332 auto dh = o2::header::get<DataHeader*>(input.header);
2333 if (!dh) {
2334 continue;
2335 }
2336 // We use the address of the first header of a split payload
2337 // to identify the interval.
2338 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2339 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2340
2341 // No split parts, we simply skip the payload
2342 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2343 // this is indicating a sequence of payloads following the header
2344 // FIXME: we will probably also set the DataHeader version
2345 pi += dh->splitPayloadParts - 1;
2346 } else {
2347 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2348 }
2349 }
2350 };
2351
2352 ref.get<DataRelayer>().getReadyToProcess(completed);
2353 if (completed.empty() == true) {
2354 LOGP(debug, "No computations available for dispatching.");
2355 return false;
2356 }
2357
2358 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2359 auto& stats = ref.get<DataProcessingStats>();
2360 auto& states = ref.get<DataProcessingStates>();
2361 std::atomic_thread_fence(std::memory_order_release);
2362 char relayerSlotState[1024];
2363 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2364 char* buffer = relayerSlotState + written;
2365 for (size_t ai = 0; ai != record.size(); ai++) {
2366 buffer[ai] = record.isValid(ai) ? '3' : '0';
2367 }
2368 buffer[record.size()] = 0;
2369 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2370 .size = (int)(record.size() + buffer - relayerSlotState),
2371 .data = relayerSlotState});
2372 uint64_t tEnd = uv_hrtime();
2373 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2374 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2376 // Sum up the total wall time, in milliseconds.
2378 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2380 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2381 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2382 auto latency = calculateInputRecordLatency(record, tStartMilli);
2383 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2384 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2385 static int count = 0;
2387 count++;
2388 };
2389
2390 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2391 auto& states = ref.get<DataProcessingStates>();
2392 std::atomic_thread_fence(std::memory_order_release);
2393 char relayerSlotState[1024];
2394 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2395 char* buffer = strchr(relayerSlotState, ' ') + 1;
2396 for (size_t ai = 0; ai != record.size(); ai++) {
2397 buffer[ai] = record.isValid(ai) ? '2' : '0';
2398 }
2399 buffer[record.size()] = 0;
2400 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2401 };
2402
2403 // This is the main dispatching loop
2404 auto& state = ref.get<DeviceState>();
2405 auto& spec = ref.get<DeviceSpec const>();
2406
2407 auto& dpContext = ref.get<DataProcessorContext>();
2408 auto& streamContext = ref.get<StreamContext>();
2409 O2_SIGNPOST_ID_GENERATE(sid, device);
2410 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2411
2412 auto& stats = ref.get<DataProcessingStats>();
2413 auto& relayer = ref.get<DataRelayer>();
2414 using namespace o2::framework;
2415 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2416 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2417 switch (spec.completionPolicy.order) {
2419 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2420 break;
2422 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2423 break;
2425 default:
2426 break;
2427 }
2428
2429 for (auto action : completed) {
2430 O2_SIGNPOST_ID_GENERATE(aid, device);
2431 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2433 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2434 continue;
2435 }
2436
2437 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2439 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2443 updateRunInformation(TimesliceSlot{action.slot});
2444 }
2445 InputSpan span = getInputSpan(action.slot, shouldConsume);
2446 auto& spec = ref.get<DeviceSpec const>();
2447 InputRecord record{spec.inputs,
2448 span,
2449 *context.registry};
2450 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2451 {
2452 // Notice this should be thread safe and reentrant
2453 // as it is called from many threads.
2454 streamContext.preProcessingCallbacks(processContext);
2455 dpContext.preProcessingCallbacks(processContext);
2456 }
2458 context.postDispatchingCallbacks(processContext);
2459 if (spec.forwards.empty() == false) {
2460 auto& timesliceIndex = ref.get<TimesliceIndex>();
2461 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2462 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2463 continue;
2464 }
2465 }
2466 // If there is no optional inputs we canForwardEarly
2467 // the messages to that parallel processing can happen.
2468 // In this case we pass true to indicate that we want to
2469 // copy the messages to the subsequent data processor.
2470 bool hasForwards = spec.forwards.empty() == false;
2472
2473 if (context.canForwardEarly && hasForwards && consumeSomething) {
2474 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2475 auto& timesliceIndex = ref.get<TimesliceIndex>();
2476 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2477 }
2478 markInputsAsDone(action.slot);
2479
2480 uint64_t tStart = uv_hrtime();
2481 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2482 preUpdateStats(action, record, tStart);
2483
2484 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2485
2486 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2487 auto& state = ref.get<DeviceState>();
2488 auto& spec = ref.get<DeviceSpec const>();
2489 auto& streamContext = ref.get<StreamContext>();
2490 auto& dpContext = ref.get<DataProcessorContext>();
2491 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2492 switch (action.op) {
2497 return true;
2498 break;
2499 default:
2500 return false;
2501 }
2502 };
2503 if (state.quitRequested == false) {
2504 {
2505 // Callbacks from services
2506 dpContext.preProcessingCallbacks(processContext);
2507 streamContext.preProcessingCallbacks(processContext);
2508 dpContext.preProcessingCallbacks(processContext);
2509 // Callbacks from users
2510 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2511 }
2512 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2513 if (context.statefulProcess && shouldProcess(action)) {
2514 // This way, usercode can use the the same processing context to identify
2515 // its signposts and we can map user code to device iterations.
2516 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2517 (context.statefulProcess)(processContext);
2518 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2519 } else if (context.statelessProcess && shouldProcess(action)) {
2520 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2521 (context.statelessProcess)(processContext);
2522 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2523 } else if (context.statelessProcess || context.statefulProcess) {
2524 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2525 } else {
2526 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2528 }
2529 if (shouldProcess(action)) {
2530 auto& timingInfo = ref.get<TimingInfo>();
2531 if (timingInfo.globalRunNumberChanged) {
2532 context.lastRunNumberProcessed = timingInfo.runNumber;
2533 }
2534 }
2535
2536 // Notify the sink we just consumed some timeframe data
2537 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2538 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2539 auto& allocator = ref.get<DataAllocator>();
2540 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2541 }
2542
2543 // Extra callback which allows a service to add extra outputs.
2544 // This is needed e.g. to ensure that injected CCDB outputs are added
2545 // before an end of stream.
2546 {
2547 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2548 dpContext.finaliseOutputsCallbacks(processContext);
2549 streamContext.finaliseOutputsCallbacks(processContext);
2550 }
2551
2552 {
2553 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2554 dpContext.postProcessingCallbacks(processContext);
2555 streamContext.postProcessingCallbacks(processContext);
2556 }
2557 }
2558 };
2559
2560 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2561 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2562 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2563 }
2564 if (noCatch) {
2565 try {
2566 runNoCatch(action);
2567 } catch (o2::framework::RuntimeErrorRef e) {
2568 (context.errorHandling)(e, record);
2569 }
2570 } else {
2571 try {
2572 runNoCatch(action);
2573 } catch (std::exception& ex) {
2577 auto e = runtime_error(ex.what());
2578 (context.errorHandling)(e, record);
2579 } catch (o2::framework::RuntimeErrorRef e) {
2580 (context.errorHandling)(e, record);
2581 }
2582 }
2583 if (state.severityStack.empty() == false) {
2584 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2585 state.severityStack.pop_back();
2586 }
2587
2588 postUpdateStats(action, record, tStart, tStartMilli);
2589 // We forward inputs only when we consume them. If we simply Process them,
2590 // we keep them for next message arriving.
2592 cleanupRecord(record);
2593 context.postDispatchingCallbacks(processContext);
2594 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2595 }
2596 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2597 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2598 auto& timesliceIndex = ref.get<TimesliceIndex>();
2599 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2600 }
2601 context.postForwardingCallbacks(processContext);
2603 cleanTimers(action.slot, record);
2604 }
2605 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2606 }
2607 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2608
2609 // We now broadcast the end of stream if it was requested
2610 if (state.streaming == StreamingState::EndOfStreaming) {
2611 LOGP(detail, "Broadcasting end of stream");
2612 for (auto& channel : spec.outputChannels) {
2614 }
2616 }
2617
2618 return true;
2619}
2620
2622{
2623 LOG(error) << msg;
2624 ServiceRegistryRef ref{mServiceRegistry};
2625 auto& stats = ref.get<DataProcessingStats>();
2627}
2628
2629std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2630{
2631
2632 if (registry.active<ConfigurationInterface>()) {
2633 auto& cfg = registry.get<ConfigurationInterface>();
2634 try {
2635 cfg.getRecursive(name);
2636 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2637 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2638 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2639 configStore->preload();
2640 configStore->activate();
2641 return configStore;
2642 } catch (...) {
2643 // No overrides...
2644 }
2645 }
2646 return {nullptr};
2647}
2648
2649} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:599
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &, ProcessingPolicies &policies)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
virtual fair::mq::Device * device()=0
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
bool hasOnlyGenerated(DeviceSpec const &spec)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void on_transition_requested_expired(uv_timer_t *handle)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
auto switchState(ServiceRegistryRef &ref, StreamingState newState) -> void
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:618
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153