Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <atomic>
33#include "Framework/InputSpan.h"
34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
36#endif
37#include "Framework/Signpost.h"
50
51#include "DecongestionService.h"
54#include "DataRelayerHelpers.h"
55#include "Headers/DataHeader.h"
57
58#include <Framework/Tracing.h>
59
60#include <fairmq/Parts.h>
61#include <fairmq/Socket.h>
62#include <fairmq/ProgOptions.h>
63#include <fairmq/shmem/Message.h>
64#include <Configuration/ConfigurationInterface.h>
65#include <Configuration/ConfigurationFactory.h>
66#include <Monitoring/Monitoring.h>
67#include <TMessage.h>
68#include <TClonesArray.h>
69
70#include <fmt/ostream.h>
71#include <algorithm>
72#include <vector>
73#include <numeric>
74#include <memory>
75#include <uv.h>
76#include <execinfo.h>
77#include <sstream>
78#include <boost/property_tree/json_parser.hpp>
79
80// Formatter to avoid having to rewrite the ostream operator for the enum
81namespace fmt
82{
83template <>
86} // namespace fmt
87
88// A log to use for general device logging
90// A log to use for general device logging
92// Special log to keep track of the lifetime of the parts
94// Stream which keeps track of the calibration lifetime logic
96// Special log to track the async queue behavior
98// Special log to track the forwarding requests
100// Special log to track CCDB related requests
102// Special log to track task scheduling
104
105using namespace o2::framework;
106using ConfigurationInterface = o2::configuration::ConfigurationInterface;
108
109constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
110
111namespace o2::framework
112{
113
114template <>
118
122{
123 auto* state = (DeviceState*)handle->data;
124 state->loopReason |= DeviceState::TIMER_EXPIRED;
125}
126
128{
129 auto* state = (DeviceState*)s->data;
131}
132
133DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
134{
135 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
136 return devices[running.index];
137}
138
144
146 : mRunningDevice{running},
147 mConfigRegistry{nullptr},
148 mServiceRegistry{registry}
149{
150 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
151 if (key == "cleanup") {
153 auto& deviceState = ref.get<DeviceState>();
154 int64_t cleanupCount = deviceState.cleanupCount.load();
155 int64_t newCleanupCount = std::stoll(value);
156 if (newCleanupCount <= cleanupCount) {
157 return;
158 }
159 deviceState.cleanupCount.store(newCleanupCount);
160 for (auto& info : deviceState.inputChannelInfos) {
161 fair::mq::Parts parts;
162 while (info.channel->Receive(parts, 0)) {
163 LOGP(debug, "Dropping {} parts", parts.Size());
164 if (parts.Size() == 0) {
165 break;
166 }
167 }
168 }
169 }
170 });
171
172 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
174 auto& deviceState = ref.get<DeviceState>();
175 auto& control = ref.get<ControlService>();
176 auto& callbacks = ref.get<CallbackService>();
177 control.notifyDeviceState(fair::mq::GetStateName(state));
179
180 if (deviceState.nextFairMQState.empty() == false) {
181 auto state = deviceState.nextFairMQState.back();
182 (void)this->ChangeState(state);
183 deviceState.nextFairMQState.pop_back();
184 }
185 };
186
187 // 99 is to execute DPL callbacks last
188 this->SubscribeToStateChange("99-dpl", stateWatcher);
189
190 // One task for now.
191 mStreams.resize(1);
192 mHandles.resize(1);
193
194 ServiceRegistryRef ref{mServiceRegistry};
195
196 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
197 auto& state = ref.get<DeviceState>();
198 assert(state.loop);
199 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
200 mAwakeHandle->data = &state;
201 if (res < 0) {
202 LOG(error) << "Unable to initialise subscription";
203 }
204
206 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
207 int res = uv_async_send(wakeHandle);
208 if (res < 0) {
209 LOG(error) << "Unable to notify subscription";
210 }
211 LOG(debug) << "State transition requested";
212 });
213}
214
215// Callback to execute the processing. Receives and relays data (doPrepare)
216// happens on the main thread before this is queued, so we only dispatch here.
217void run_callback(uv_work_t* handle)
218{
219 auto* task = (TaskStreamInfo*)handle->data;
220 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
221 // We create a new signpost interval for this specific data processor. Same id, same data processor.
222 auto& dataProcessorContext = ref.get<DataProcessorContext>();
223 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
224 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
226 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
227}
228
229// Once the processing in a thread is done, this is executed on the main thread.
230void run_completion(uv_work_t* handle, int status)
231{
232 auto* task = (TaskStreamInfo*)handle->data;
233 // Notice that the completion, while running on the main thread, still
234 // has a salt which is associated to the actual stream which was doing the computation
235 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
236 auto& state = ref.get<DeviceState>();
237 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
238
239 using o2::monitoring::Metric;
240 using o2::monitoring::Monitoring;
241 using o2::monitoring::tags::Key;
242 using o2::monitoring::tags::Value;
243
244 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
245 auto& dpStats = ref.get<DataProcessingStats>();
246 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
247 // For now we give back the offer if we did not use it completely.
248 // In principle we should try to run until the offer is fully consumed.
249 stats.totalConsumedTimeslices += std::min<int64_t>(accumulatedConsumed.timeslices, 1);
250
251 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
252 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedTimeslices});
253 dpStats.processCommandQueue();
254 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
255 assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]);
256 };
257
258 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
259 auto& dpStats = ref.get<DataProcessingStats>();
260 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
261 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
262 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
263 dpStats.processCommandQueue();
264 };
265
266 for (auto& consumer : state.offerConsumers) {
267 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
268 }
269 state.offerConsumers.clear();
270 quotaEvaluator.handleExpired(reportExpiredOffer);
271 quotaEvaluator.dispose(task->id.index);
272 task->running = false;
273}
274
275// Context for polling
277 enum struct PollerState : char { Stopped,
279 Connected,
280 Suspended };
281 char const* name = nullptr;
282 uv_loop_t* loop = nullptr;
284 DeviceState* state = nullptr;
285 fair::mq::Socket* socket = nullptr;
287 int fd = -1;
288 bool read = true;
290};
291
292void on_socket_polled(uv_poll_t* poller, int status, int events)
293{
294 auto* context = (PollerContext*)poller->data;
295 assert(context);
296 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
297 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
298 switch (events) {
299 case UV_READABLE: {
300 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
301 context->state->loopReason |= DeviceState::DATA_INCOMING;
302 } break;
303 case UV_WRITABLE: {
304 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
305 if (context->read) {
306 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
307 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
308 context->state->loopReason |= DeviceState::DATA_CONNECTED;
309 } else {
310 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
311 context->state->loopReason |= DeviceState::DATA_OUTGOING;
312 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
313 // just wait for the disconnect.
314 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
315 }
316 context->pollerState = PollerContext::PollerState::Connected;
317 } break;
318 case UV_DISCONNECT: {
319 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
320 } break;
321 case UV_PRIORITIZED: {
322 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
323 } break;
324 }
325 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
326}
327
328void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
329{
330 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
331 auto* context = (PollerContext*)poller->data;
332 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
333 if (status < 0) {
334 LOGP(fatal, "Error while polling {}: {}", context->name, status);
335 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
336 }
337 switch (events) {
338 case UV_READABLE: {
339 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
340 context->state->loopReason |= DeviceState::DATA_INCOMING;
341 assert(context->channelInfo);
342 context->channelInfo->readPolled = true;
343 } break;
344 case UV_WRITABLE: {
345 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
346 if (context->read) {
347 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
348 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
349 } else {
350 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
351 context->state->loopReason |= DeviceState::DATA_OUTGOING;
352 }
353 } break;
354 case UV_DISCONNECT: {
355 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
356 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
357 } break;
358 case UV_PRIORITIZED: {
359 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
360 } break;
361 }
362 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
363}
364
373{
374 auto ref = ServiceRegistryRef{mServiceRegistry};
375 auto& context = ref.get<DataProcessorContext>();
376 auto& spec = getRunningDevice(mRunningDevice, ref);
377
378 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
379 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
380 context.statelessProcess = spec.algorithm.onProcess;
381 context.statefulProcess = nullptr;
382 context.error = spec.algorithm.onError;
383 context.initError = spec.algorithm.onInitError;
384
385 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
386 if (configStore == nullptr) {
387 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
388 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
389 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
390 configStore->preload();
391 configStore->activate();
392 }
393
394 using boost::property_tree::ptree;
395
397 for (auto& entry : configStore->store()) {
398 std::stringstream ss;
399 std::string str;
400 if (entry.second.empty() == false) {
401 boost::property_tree::json_parser::write_json(ss, entry.second, false);
402 str = ss.str();
403 } else {
404 str = entry.second.get_value<std::string>();
405 }
406 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
407 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
408 }
409
410 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
411
412 // Setup the error handlers for init
413 if (context.initError) {
414 context.initErrorHandling = [&errorCallback = context.initError,
415 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
419 auto& context = ref.get<DataProcessorContext>();
420 auto& err = error_from_ref(e);
421 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
422 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
423 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
424 auto& stats = ref.get<DataProcessingStats>();
426 InitErrorContext errorContext{ref, e};
427 errorCallback(errorContext);
428 };
429 } else {
430 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
431 auto& err = error_from_ref(e);
435 auto& context = ref.get<DataProcessorContext>();
436 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
437 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
438 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
439 auto& stats = ref.get<DataProcessingStats>();
441 exit(1);
442 };
443 }
444
445 context.expirationHandlers.clear();
446 context.init = spec.algorithm.onInit;
447 if (context.init) {
448 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
449 InitContext initContext{*mConfigRegistry, mServiceRegistry};
450
451 if (noCatch) {
452 try {
453 context.statefulProcess = context.init(initContext);
455 if (context.initErrorHandling) {
456 (context.initErrorHandling)(e);
457 }
458 }
459 } else {
460 try {
461 context.statefulProcess = context.init(initContext);
462 } catch (std::exception& ex) {
466 auto e = runtime_error(ex.what());
467 (context.initErrorHandling)(e);
469 (context.initErrorHandling)(e);
470 }
471 }
472 }
473 auto& state = ref.get<DeviceState>();
474 state.inputChannelInfos.resize(spec.inputChannels.size());
478 int validChannelId = 0;
479 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
480 auto& name = spec.inputChannels[ci].name;
481 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
482 state.inputChannelInfos[ci].state = InputChannelState::Pull;
483 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
484 validChannelId++;
485 } else {
486 state.inputChannelInfos[ci].id = {validChannelId++};
487 }
488 }
489
490 // Invoke the callback policy for this device.
491 if (spec.callbacksPolicy.policy != nullptr) {
492 InitContext initContext{*mConfigRegistry, mServiceRegistry};
493 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
494 }
495
496 // Services which are stream should be initialised now
497 auto* options = GetConfig();
498 for (size_t si = 0; si < mStreams.size(); ++si) {
500 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
501 }
502 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
503}
504
505void on_signal_callback(uv_signal_t* handle, int signum)
506{
507 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
508 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
509
510 auto* registry = (ServiceRegistry*)handle->data;
511 if (!registry) {
512 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
513 return;
514 }
515 ServiceRegistryRef ref{*registry};
516 auto& state = ref.get<DeviceState>();
517 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
518 auto& stats = ref.get<DataProcessingStats>();
520 size_t ri = 0;
521 while (ri != quotaEvaluator.mOffers.size()) {
522 auto& offer = quotaEvaluator.mOffers[ri];
523 // We were already offered some sharedMemory, so we
524 // do not consider the offer.
525 // FIXME: in principle this should account for memory
526 // available and being offered, however we
527 // want to get out of the woods for now.
528 if (offer.valid && offer.sharedMemory != 0) {
529 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
530 return;
531 }
532 ri++;
533 }
534 // Find the first empty offer and have 1GB of shared memory there
535 for (auto& offer : quotaEvaluator.mOffers) {
536 if (offer.valid == false) {
537 offer.cpu = 0;
538 offer.memory = 0;
539 offer.sharedMemory = 1000000000;
540 offer.valid = true;
541 offer.user = -1;
542 break;
543 }
544 }
546 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
547}
548
549struct DecongestionContext {
552};
553
554auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
555 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
556 auto& ref = task.user<DecongestionContext>().ref;
557
558 auto& decongestion = ref.get<DecongestionService>();
559 auto& proxy = ref.get<FairMQDeviceProxy>();
560 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
561 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
562 return;
563 }
564 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
565 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
566 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
567 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
568 // TODO: this we could cache in the proxy at the bind moment.
569 if (info.channelType != ChannelAccountingType::DPL) {
570 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
571 info.name.c_str());
572
573 continue;
574 }
575 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
576 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
577 info.name.c_str(), oldestTimeslice.timeslice.value);
578 }
579 }
580};
581
582// This is how we do the forwarding, i.e. we push
583// the inputs which are shared between this device and others
584// to the next one in the daisy chain.
585// FIXME: do it in a smarter way than O(N^2)
586static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
587 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
588 auto& proxy = registry.get<FairMQDeviceProxy>();
589
590 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
591 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
592 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
593 auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copy, consume);
594
595 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
596 if (forwardedParts[fi].Size() == 0) {
597 continue;
598 }
599 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
600 auto& parts = forwardedParts[fi];
601 if (info.policy == nullptr) {
602 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
603 continue;
604 }
605 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
606 info.policy->forward(parts, ChannelIndex{fi}, registry);
607 }
608
609 auto& asyncQueue = registry.get<AsyncQueue>();
610 auto& decongestion = registry.get<DecongestionService>();
611 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
612 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
613 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
614 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
615 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
616};
617
618static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
619 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
620 auto& proxy = registry.get<FairMQDeviceProxy>();
621
622 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
623 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
624 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
625 // Always copy them, because we do not want to actually send them.
626 // We merely need the side effect of the consume, if applicable.
627 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
628 auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
630 }
631
632 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
633};
634
635extern volatile int region_read_global_dummy_variable;
637
639void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
640{
641 if (infos.empty() == false) {
642 std::vector<fair::mq::RegionInfo> toBeNotified;
643 toBeNotified.swap(infos); // avoid any MT issue.
644 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
645 for (auto const& info : toBeNotified) {
646 if (dummyRead) {
647 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
648 region_read_global_dummy_variable = ((int*)info.ptr)[i];
649 }
650 }
651 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
652 }
653 }
654}
655
656namespace
657{
659{
660 auto* state = (DeviceState*)handle->data;
662}
663} // namespace
664
665void DataProcessingDevice::initPollers()
666{
667 auto ref = ServiceRegistryRef{mServiceRegistry};
668 auto& deviceContext = ref.get<DeviceContext>();
669 auto& context = ref.get<DataProcessorContext>();
670 auto& spec = ref.get<DeviceSpec const>();
671 auto& state = ref.get<DeviceState>();
672 // We add a timer only in case a channel poller is not there.
673 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
674 for (auto& [channelName, channel] : GetChannels()) {
675 InputChannelInfo* channelInfo;
676 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
677 auto& channelSpec = spec.inputChannels[ci];
678 channelInfo = &state.inputChannelInfos[ci];
679 if (channelSpec.name != channelName) {
680 continue;
681 }
682 channelInfo->channel = &this->GetChannel(channelName, 0);
683 break;
684 }
685 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
686 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
687 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
688 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
689 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
690 continue;
691 }
692 // We only watch receiving sockets.
693 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
694 LOGP(detail, "{} is to send data. Not polling.", channelName);
695 continue;
696 }
697
698 if (channelName.rfind("from_", 0) != 0) {
699 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
700 continue;
701 }
702
703 // We assume there is always a ZeroMQ socket behind.
704 int zmq_fd = 0;
705 size_t zmq_fd_len = sizeof(zmq_fd);
706 // FIXME: I should probably save those somewhere... ;-)
707 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
708 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
709 if (zmq_fd == 0) {
710 LOG(error) << "Cannot get file descriptor for channel." << channelName;
711 continue;
712 }
713 LOGP(detail, "Polling socket for {}", channelName);
714 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
715 pCtx->name = strdup(channelName.c_str());
716 pCtx->loop = state.loop;
717 pCtx->device = this;
718 pCtx->state = &state;
719 pCtx->fd = zmq_fd;
720 assert(channelInfo != nullptr);
721 pCtx->channelInfo = channelInfo;
722 pCtx->socket = &channel[0].GetSocket();
723 pCtx->read = true;
724 poller->data = pCtx;
725 uv_poll_init(state.loop, poller, zmq_fd);
726 if (channelName.rfind("from_", 0) != 0) {
727 LOGP(detail, "{} is an out of band channel.", channelName);
728 state.activeOutOfBandPollers.push_back(poller);
729 } else {
730 channelInfo->pollerIndex = state.activeInputPollers.size();
731 state.activeInputPollers.push_back(poller);
732 }
733 }
734 // In case we do not have any input channel and we do not have
735 // any timers or signal watchers we still wake up whenever we can send data to downstream
736 // devices to allow for enumerations.
737 if (state.activeInputPollers.empty() &&
738 state.activeOutOfBandPollers.empty() &&
739 state.activeTimers.empty() &&
740 state.activeSignals.empty()) {
741 // FIXME: this is to make sure we do not reset the output timer
742 // for readout proxies or similar. In principle this should go once
743 // we move to OutOfBand InputSpec.
744 if (state.inputChannelInfos.empty()) {
745 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
746 deviceContext.exitTransitionTimeout = 0;
747 }
748 for (auto& [channelName, channel] : GetChannels()) {
749 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
750 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
751 continue;
752 }
753 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
754 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
755 continue;
756 }
757 // We assume there is always a ZeroMQ socket behind.
758 int zmq_fd = 0;
759 size_t zmq_fd_len = sizeof(zmq_fd);
760 // FIXME: I should probably save those somewhere... ;-)
761 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
762 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
763 if (zmq_fd == 0) {
764 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
765 continue;
766 }
767 LOG(detail) << "Polling socket for " << channel[0].GetName();
768 // FIXME: leak
769 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
770 pCtx->name = strdup(channelName.c_str());
771 pCtx->loop = state.loop;
772 pCtx->device = this;
773 pCtx->state = &state;
774 pCtx->fd = zmq_fd;
775 pCtx->read = false;
776 poller->data = pCtx;
777 uv_poll_init(state.loop, poller, zmq_fd);
778 state.activeOutputPollers.push_back(poller);
779 }
780 }
781 } else {
782 LOGP(detail, "This is a fake device so we exit after the first iteration.");
783 deviceContext.exitTransitionTimeout = 0;
784 // This is a fake device, so we can request to exit immediately
785 ServiceRegistryRef ref{mServiceRegistry};
786 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
787 // A two second timer to stop internal devices which do not want to
788 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
789 uv_timer_init(state.loop, timer);
790 timer->data = &state;
791 uv_update_time(state.loop);
792 uv_timer_start(timer, on_idle_timer, 2000, 2000);
793 state.activeTimers.push_back(timer);
794 }
795}
796
797void DataProcessingDevice::startPollers()
798{
799 auto ref = ServiceRegistryRef{mServiceRegistry};
800 auto& deviceContext = ref.get<DeviceContext>();
801 auto& state = ref.get<DeviceState>();
802
803 for (auto* poller : state.activeInputPollers) {
804 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
805 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
806 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
807 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
808 }
809 for (auto& poller : state.activeOutOfBandPollers) {
810 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
811 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
812 }
813 for (auto* poller : state.activeOutputPollers) {
814 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
815 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
816 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
817 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
818 }
819
820 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
821 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
822 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
823
824 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
825 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
826 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
827}
828
829void DataProcessingDevice::stopPollers()
830{
831 auto ref = ServiceRegistryRef{mServiceRegistry};
832 auto& deviceContext = ref.get<DeviceContext>();
833 auto& state = ref.get<DeviceState>();
834 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
835 for (auto* poller : state.activeInputPollers) {
836 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
837 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
838 uv_poll_stop(poller);
839 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
840 }
841 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
842 for (auto* poller : state.activeOutOfBandPollers) {
843 uv_poll_stop(poller);
844 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
845 }
846 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
847 for (auto* poller : state.activeOutputPollers) {
848 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
849 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
850 uv_poll_stop(poller);
851 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
852 }
853
854 uv_timer_stop(deviceContext.gracePeriodTimer);
855 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
856 free(deviceContext.gracePeriodTimer);
857 deviceContext.gracePeriodTimer = nullptr;
858
859 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
860 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
861 free(deviceContext.dataProcessingGracePeriodTimer);
862 deviceContext.dataProcessingGracePeriodTimer = nullptr;
863}
864
866{
867 auto ref = ServiceRegistryRef{mServiceRegistry};
868 auto& deviceContext = ref.get<DeviceContext>();
869 auto& context = ref.get<DataProcessorContext>();
870
871 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
872 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
873 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
874 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
875 auto& state = ref.get<DeviceState>();
876 int i = 0;
877 for (auto& di : distinct) {
878 auto& route = spec.inputs[di];
879 if (route.configurator.has_value() == false) {
880 i++;
881 continue;
882 }
883 ExpirationHandler handler{
884 .name = route.configurator->name,
885 .routeIndex = RouteIndex{i++},
886 .lifetime = route.matcher.lifetime,
887 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
888 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
889 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
890 context.expirationHandlers.emplace_back(std::move(handler));
891 }
892
893 if (state.awakeMainThread == nullptr) {
894 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
895 state.awakeMainThread->data = &state;
896 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
897 }
898
899 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
900 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
901 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
902
903 for (auto& channel : GetChannels()) {
904 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
905 &registry = mServiceRegistry,
906 &pendingRegionInfos = mPendingRegionInfos,
907 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
908 std::lock_guard<std::mutex> lock(regionInfoMutex);
909 LOG(detail) << ">>> Region info event" << info.event;
910 LOG(detail) << "id: " << info.id;
911 LOG(detail) << "ptr: " << info.ptr;
912 LOG(detail) << "size: " << info.size;
913 LOG(detail) << "flags: " << info.flags;
914 // Now we check for pending events with the mutex,
915 // so the lines below are atomic.
916 pendingRegionInfos.push_back(info);
917 context.expectedRegionCallbacks -= 1;
918 // We always want to handle these on the main loop,
919 // so we awake it.
920 ServiceRegistryRef ref{registry};
921 uv_async_send(ref.get<DeviceState>().awakeMainThread);
922 });
923 }
924
925 // Add a signal manager for SIGUSR1 so that we can force
926 // an event from the outside, making sure that the event loop can
927 // be unblocked (e.g. by a quitting DPL driver) even when there
928 // is no data pending to be processed.
929 if (deviceContext.sigusr1Handle == nullptr) {
930 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
931 deviceContext.sigusr1Handle->data = &mServiceRegistry;
932 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
933 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
934 }
935 // If there is any signal, we want to make sure they are active
936 for (auto& handle : state.activeSignals) {
937 handle->data = &state;
938 }
939 // When we start, we must make sure that we do listen to the signal
940 deviceContext.sigusr1Handle->data = &mServiceRegistry;
941
943 DataProcessingDevice::initPollers();
944
945 // Whenever we InitTask, we consider as if the previous iteration
946 // was successful, so that even if there is no timer or receiving
947 // channel, we can still start an enumeration.
948 DataProcessorContext* initialContext = nullptr;
949 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
950 if (!idle) {
951 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
952 }
953
954 // We should be ready to run here. Therefore we copy all the
955 // required parts in the DataProcessorContext. Eventually we should
956 // do so on a per thread basis, with fine grained locks.
957 // FIXME: this should not use ServiceRegistry::threadSalt, but
958 // more a ServiceRegistry::globalDataProcessorSalt(N) where
959 // N is the number of the multiplexed data processor.
960 // We will get there.
961 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
962
963 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
964
965 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
966 std::lock_guard<std::mutex> lock(mutex);
967 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
968 };
969 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
974 while (hasPendingEvents(deviceContext)) {
975 // Wait for the callback to signal its done, so that we do not busy wait.
976 uv_run(state.loop, UV_RUN_ONCE);
977 // Handle callbacks if any
978 {
979 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
980 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
981 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
982 }
983 }
984 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
985}
986
988{
989 context.isSink = false;
990 // If nothing is a sink, the rate limiting simply does not trigger.
991 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
992
993 auto ref = ServiceRegistryRef{mServiceRegistry};
994 auto& spec = ref.get<DeviceSpec const>();
995
996 // The policy is now allowed to state the default.
997 context.balancingInputs = spec.completionPolicy.balanceChannels;
998 // This is needed because the internal injected dummy sink should not
999 // try to balance inputs unless the rate limiting is requested.
1000 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1001 context.balancingInputs = false;
1002 }
1003 if (enableRateLimiting) {
1004 for (auto& spec : spec.outputs) {
1005 if (spec.matcher.binding.value == "dpl-summary") {
1006 context.isSink = true;
1007 break;
1008 }
1009 }
1010 }
1011
1012 context.registry = &mServiceRegistry;
1015 if (context.error != nullptr) {
1016 context.errorHandling = [&errorCallback = context.error,
1017 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1021 auto& err = error_from_ref(e);
1022 auto& context = ref.get<DataProcessorContext>();
1023 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1024 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1025 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1026 auto& stats = ref.get<DataProcessingStats>();
1028 ErrorContext errorContext{record, ref, e};
1029 errorCallback(errorContext);
1030 };
1031 } else {
1032 context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1033 auto& err = error_from_ref(e);
1037 auto& context = ref.get<DataProcessorContext>();
1038 auto& deviceContext = ref.get<DeviceContext>();
1039 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1040 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1041 auto& stats = ref.get<DataProcessingStats>();
1043 switch (deviceContext.processingPolicies.error) {
1045 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1046 throw e;
1047 default:
1048 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1049 break;
1050 }
1051 };
1052 }
1053
1054 auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> ForwardPolicy {
1055 ForwardPolicy defaultEarlyForwardPolicy = getenv("DPL_OLD_EARLY_FORWARD") ? ForwardPolicy::AtCompletionPolicySatisified : ForwardPolicy::AtInjection;
1056 // FIXME: try again with the new policy by default.
1057 //
1058 // Make the new policy optional until we handle some of the corner cases
1059 // with custom policies which expect the early forward to happen only when
1060 // all the data is available, like in the TPC case.
1061 // ForwardPolicy defaultEarlyForwardPolicy = getenv("DPL_NEW_EARLY_FORWARD") ? ForwardPolicy::AtInjection : ForwardPolicy::AtCompletionPolicySatisified;
1062 for (auto& forward : spec.forwards) {
1063 if (DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}) ||
1064 DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}) ||
1065 DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "DIGITS"}) ||
1066 DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CLUSTERNATIVE"})) {
1067 defaultEarlyForwardPolicy = ForwardPolicy::AtCompletionPolicySatisified;
1068 break;
1069 }
1070 }
1071 // Output proxies should wait for the completion policy before forwarding.
1072 // Because they actually do not do anything, that's equivalent to
1073 // forwarding after the processing.
1074 for (auto& label : spec.labels) {
1075 if (label.value == "output-proxy") {
1076 defaultEarlyForwardPolicy = ForwardPolicy::AfterProcessing;
1077 break;
1078 }
1079 }
1080
1083 ForwardPolicy forwardPolicy = defaultEarlyForwardPolicy;
1084 if (spec.forwards.empty() == false) {
1085 switch (deviceContext.processingPolicies.earlyForward) {
1087 forwardPolicy = ForwardPolicy::AfterProcessing;
1088 break;
1090 forwardPolicy = defaultEarlyForwardPolicy;
1091 break;
1093 forwardPolicy = defaultEarlyForwardPolicy;
1094 break;
1095 }
1096 }
1097 bool onlyConditions = true;
1098 bool overriddenEarlyForward = false;
1099 for (auto& forwarded : spec.forwards) {
1100 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1101 onlyConditions = false;
1102 }
1104 forwardPolicy = ForwardPolicy::AfterProcessing;
1105 overriddenEarlyForward = true;
1106 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1107 break;
1108 }
1109 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1110 forwardPolicy = ForwardPolicy::AfterProcessing;
1111 overriddenEarlyForward = true;
1112 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1113 break;
1114 }
1115 }
1116 if (!overriddenEarlyForward && onlyConditions) {
1117 forwardPolicy = defaultEarlyForwardPolicy;
1118 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1119 }
1120 return forwardPolicy;
1121 };
1122 context.forwardPolicy = decideEarlyForward();
1123}
1124
1126{
1127 auto ref = ServiceRegistryRef{mServiceRegistry};
1128 auto& state = ref.get<DeviceState>();
1129
1130 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1131 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1132 state.quitRequested = false;
1134 state.allowedProcessing = DeviceState::Any;
1135 for (auto& info : state.inputChannelInfos) {
1136 if (info.state != InputChannelState::Pull) {
1137 info.state = InputChannelState::Running;
1138 }
1139 }
1140
1141 // Catch callbacks which fail before we start.
1142 // Notice that when running multiple dataprocessors
1143 // we should probably allow expendable ones to fail.
1144 try {
1145 auto& dpContext = ref.get<DataProcessorContext>();
1146 dpContext.preStartCallbacks(ref);
1147 for (size_t i = 0; i < mStreams.size(); ++i) {
1148 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1149 auto& context = streamRef.get<StreamContext>();
1150 context.preStartStreamCallbacks(streamRef);
1151 }
1152 } catch (std::exception& e) {
1153 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1154 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1155 throw;
1156 } catch (o2::framework::RuntimeErrorRef& e) {
1157 auto& err = error_from_ref(e);
1158 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1159 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1160 throw;
1161 } catch (...) {
1162 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1163 throw;
1164 }
1165
1166 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1167 startPollers();
1168
1169 // Raise to 1 when we are ready to start processing
1170 using o2::monitoring::Metric;
1171 using o2::monitoring::Monitoring;
1172 using o2::monitoring::tags::Key;
1173 using o2::monitoring::tags::Value;
1174
1175 auto& monitoring = ref.get<Monitoring>();
1176 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1177 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1178}
1179
1181{
1182 ServiceRegistryRef ref{mServiceRegistry};
1183 // Raise to 1 when we are ready to start processing
1184 using o2::monitoring::Metric;
1185 using o2::monitoring::Monitoring;
1186 using o2::monitoring::tags::Key;
1187 using o2::monitoring::tags::Value;
1188
1189 auto& monitoring = ref.get<Monitoring>();
1190 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1191
1192 stopPollers();
1193 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1194 auto& dpContext = ref.get<DataProcessorContext>();
1195 dpContext.postStopCallbacks(ref);
1196}
1197
1199{
1200 ServiceRegistryRef ref{mServiceRegistry};
1201 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1202}
1203
1205{
1206 ServiceRegistryRef ref{mServiceRegistry};
1207 auto& state = ref.get<DeviceState>();
1209 bool firstLoop = true;
1210 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1211 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1212
1213 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1214 if (dplEnableMultithreding) {
1215 setenv("UV_THREADPOOL_SIZE", "1", 1);
1216 }
1217
1218 while (state.transitionHandling != TransitionHandlingState::Expired) {
1219 if (state.nextFairMQState.empty() == false) {
1220 (void)this->ChangeState(state.nextFairMQState.back());
1221 state.nextFairMQState.pop_back();
1222 }
1223 // Notify on the main thread the new region callbacks, making sure
1224 // no callback is issued if there is something still processing.
1225 {
1226 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1227 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1228 }
1229 // This will block for the correct delay (or until we get data
1230 // on a socket). We also do not block on the first iteration
1231 // so that devices which do not have a timer can still start an
1232 // enumeration.
1233 {
1234 ServiceRegistryRef ref{mServiceRegistry};
1235 ref.get<DriverClient>().flushPending(mServiceRegistry);
1236 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1237 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1238 // In such case we will take care of it at next iteration.
1239 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1240
1241 auto shouldNotWait = (lastActive != nullptr &&
1242 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1244 if (firstLoop) {
1245 shouldNotWait = true;
1246 firstLoop = false;
1247 }
1248 if (lastActive != nullptr) {
1250 }
1251 if (NewStatePending()) {
1252 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1253 shouldNotWait = true;
1255 }
1257 // If we are Idle, we can then consider the transition to be expired.
1258 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1259 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1260 state.transitionHandling = TransitionHandlingState::Expired;
1261 }
1262 if (state.severityStack.empty() == false) {
1263 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1264 state.severityStack.pop_back();
1265 }
1266 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1267 // shouldNotWait |= info.readPolled;
1268 // }
1269 state.loopReason = DeviceState::NO_REASON;
1270 state.firedTimers.clear();
1271 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1272 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1273 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1274 }
1275 // Run the asynchronous queue just before sleeping again, so that:
1276 // - we can trigger further events from the queue
1277 // - we can guarantee this is the last thing we do in the loop (
1278 // assuming no one else is adding to the queue before this point).
1279 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1280 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1281 ServiceRegistryRef ref{registry};
1282 ref.get<AsyncQueue>();
1283 ref.get<DecongestionService>();
1284 ref.get<DataRelayer>();
1285 // Get the current timeslice for the slot.
1286 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1288 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1289 };
1290 auto& relayer = ref.get<DataRelayer>();
1291 relayer.prunePending(onDrop);
1292 auto& queue = ref.get<AsyncQueue>();
1293 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1294 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1295 if (shouldNotWait == false) {
1296 auto& dpContext = ref.get<DataProcessorContext>();
1297 dpContext.preLoopCallbacks(ref);
1298 }
1299 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1300 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1301 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1302 if ((state.loopReason & state.tracingFlags) != 0) {
1303 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1304 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1305 } else if (state.severityStack.empty() == false) {
1306 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1307 state.severityStack.pop_back();
1308 }
1309 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1310
1311 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1312 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1313 relayer.rescan();
1314 }
1315
1316 if (!state.pendingOffers.empty()) {
1317 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1318 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1319 }
1320 }
1321
1322 // Notify on the main thread the new region callbacks, making sure
1323 // no callback is issued if there is something still processing.
1324 // Notice that we still need to perform callbacks also after
1325 // the socket epolled, because otherwise we would end up serving
1326 // the callback after the first data arrives is the system is too
1327 // fast to transition from Init to Run.
1328 {
1329 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1330 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1331 }
1332
1333 // Receive and relay incoming data on the main thread so that I/O
1334 // overlaps with computation running concurrently on work threads.
1336
1337 assert(mStreams.size() == mHandles.size());
1339 TaskStreamRef streamRef{-1};
1340 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1341 auto& taskInfo = mStreams[ti];
1342 if (taskInfo.running) {
1343 continue;
1344 }
1345 // Stream 0 is for when we run in
1346 streamRef.index = ti;
1347 }
1348 using o2::monitoring::Metric;
1349 using o2::monitoring::Monitoring;
1350 using o2::monitoring::tags::Key;
1351 using o2::monitoring::tags::Value;
1352 // We have an empty stream, let's check if we have enough
1353 // resources for it to run something
1354 if (streamRef.index != -1) {
1355 // Synchronous execution of the callbacks. This will be moved in the
1356 // moved in the on_socket_polled once we have threading in place.
1357 uv_work_t& handle = mHandles[streamRef.index];
1358 TaskStreamInfo& stream = mStreams[streamRef.index];
1359 handle.data = &mStreams[streamRef.index];
1360
1361 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1362 ServiceRegistryRef ref{registry};
1363 auto& dpStats = ref.get<DataProcessingStats>();
1364 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1365 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1366 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
1367 dpStats.processCommandQueue();
1368 };
1369 auto ref = ServiceRegistryRef{mServiceRegistry};
1370
1371 // Deciding wether to run or not can be done by passing a request to
1372 // the evaluator. In this case, the request is always satisfied and
1373 // we run on whatever resource is available.
1374 auto& spec = ref.get<DeviceSpec const>();
1375 ComputingQuotaOffer accumulated;
1376 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop), &accumulated);
1377
1378 struct SchedulingStats {
1379 std::atomic<size_t> lastScheduled = 0;
1380 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1381 std::atomic<size_t> numberOfUnscheduled = 0;
1382 std::atomic<size_t> numberOfScheduled = 0;
1383 std::atomic<size_t> nextWarnAt = 1;
1384 };
1385 static SchedulingStats schedulingStats;
1386 O2_SIGNPOST_ID_GENERATE(sid, scheduling);
1387 if (enough) {
1388 stream.id = streamRef;
1389 stream.running = true;
1390 stream.registry = &mServiceRegistry;
1391 schedulingStats.lastScheduled = uv_now(state.loop);
1392 schedulingStats.numberOfScheduled++;
1393 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1394 schedulingStats.nextWarnAt = 1;
1395 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
1396 if (dplEnableMultithreding) [[unlikely]] {
1397 stream.task = &handle;
1398 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1399 } else {
1400 run_callback(&handle);
1401 run_completion(&handle, 0);
1402 }
1403 } else {
1404 auto const lastSched = schedulingStats.lastScheduled.load();
1405 auto const schedInfo = lastSched ? fmt::format(", last scheduled {} ms ago", uv_now(state.loop) - lastSched) : std::string(", never successfully scheduled");
1406 auto const buildMissingInfo = [&]() {
1407 auto const& required = spec.resourcePolicy.minRequired;
1408 std::string missingInfo;
1409 if (required.sharedMemory > 0 && accumulated.sharedMemory < required.sharedMemory) {
1410 missingInfo += fmt::format(" shared memory (have {} MB, need {} MB)", accumulated.sharedMemory / 1000000, required.sharedMemory / 1000000);
1411 }
1412 if (required.timeslices > 0 && accumulated.timeslices < required.timeslices) {
1413 missingInfo += fmt::format(" timeslices (have {}, need {})", accumulated.timeslices, required.timeslices);
1414 }
1415 if (required.cpu > 0 && accumulated.cpu < required.cpu) {
1416 missingInfo += fmt::format(" CPU cores (have {}, need {})", accumulated.cpu, required.cpu);
1417 }
1418 if (required.memory > 0 && accumulated.memory < required.memory) {
1419 missingInfo += fmt::format(" memory (have {} MB, need {} MB)", accumulated.memory / 1000000, required.memory / 1000000);
1420 }
1421 return missingInfo.empty() ? std::string(" (policy: ") + spec.resourcePolicy.name + ")" : " -" + missingInfo;
1422 };
1423 if (schedulingStats.numberOfUnscheduledSinceLastScheduled >= schedulingStats.nextWarnAt) {
1424 auto const missingStr = buildMissingInfo();
1425 O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1426 "Not enough resources to schedule computation on stream %d. %zu consecutive skips%s. Missing:%s. Data is not lost and it will be scheduled again.",
1427 streamRef.index,
1428 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1429 schedInfo.c_str(),
1430 missingStr.c_str());
1431 schedulingStats.nextWarnAt = schedulingStats.nextWarnAt * 2;
1432 } else {
1433 auto const missingStr = buildMissingInfo();
1434 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1435 "Not enough resources to schedule computation on stream %d. %zu consecutive skips%s. Missing:%s. Data is not lost and it will be scheduled again.",
1436 streamRef.index,
1437 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1438 schedInfo.c_str(),
1439 missingStr.c_str());
1440 }
1441 schedulingStats.numberOfUnscheduled++;
1442 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1443 auto ref = ServiceRegistryRef{mServiceRegistry};
1444 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1445 }
1446 }
1447 }
1448
1449 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
1450 auto& spec = ref.get<DeviceSpec const>();
1452 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1453 auto& info = state.inputChannelInfos[ci];
1454 info.parts.fParts.clear();
1455 }
1456 state.transitionHandling = TransitionHandlingState::NoTransition;
1457}
1458
1462{
1463 auto& context = ref.get<DataProcessorContext>();
1464 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1465 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1466
1467 {
1468 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1469 }
1470 // Whether or not we had something to do.
1471
1472 // Initialise the value for context.allDone. It will possibly be updated
1473 // below if any of the channels is not done.
1474 //
1475 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1476 // expect to receive an EndOfStream signal. Thus we do not wait for these
1477 // to be completed. In the case of data source devices, as they do not have
1478 // real data input channels, they have to signal EndOfStream themselves.
1479 auto& state = ref.get<DeviceState>();
1480 auto& spec = ref.get<DeviceSpec const>();
1481 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1482 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1483 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1484 if (info.channel) {
1485 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1486 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1487 } else {
1488 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1489 }
1490 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1491 });
1492 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1493 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1496 static std::vector<int> pollOrder;
1497 pollOrder.resize(state.inputChannelInfos.size());
1498 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1499 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1500 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1501 });
1502
1503 // Nothing to poll...
1504 if (pollOrder.empty()) {
1505 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1506 return;
1507 }
1508 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1509 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1510 auto delta = currentNewest.value - currentOldest.value;
1511 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1512 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1513 auto& infos = state.inputChannelInfos;
1514
1515 if (context.balancingInputs) {
1516 static int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get<RawDeviceService>().device()->fConfig);
1517 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1518 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1519 return infos[a].oldestForChannel.value > limitNew;
1520 });
1521 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1522 const auto& channelInfo = state.inputChannelInfos[*it];
1523 if (channelInfo.pollerIndex != -1) {
1524 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1525 auto& pollerContext = *(PollerContext*)(poller->data);
1526 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1527 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1528 bool shouldBeRunning = it < newEnd;
1529 if (running != shouldBeRunning) {
1530 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1531 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1532 }
1533 }
1534 }
1535 }
1536 pollOrder.erase(newEnd, pollOrder.end());
1537 }
1538 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1539
1540 for (auto sci : pollOrder) {
1541 auto& info = state.inputChannelInfos[sci];
1542 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1543 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", info.channel->GetName().c_str());
1544
1546 context.allDone = false;
1547 }
1548 if (info.state != InputChannelState::Running) {
1549 // Remember to flush data if we are not running
1550 // and there is some message pending.
1551 if (info.parts.Size()) {
1553 }
1554 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1555 info.channel->GetName().c_str(), (int)info.state, info.parts.Size());
1556 continue;
1557 }
1558 if (info.channel == nullptr) {
1559 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1560 info.channel->GetName().c_str(), (int)info.state, info.parts.Size());
1561 continue;
1562 }
1563 // Only poll DPL channels for now.
1564 if (info.channelType != ChannelAccountingType::DPL) {
1565 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1566 info.channel->GetName().c_str(), (int)info.state, info.parts.Size());
1567 continue;
1568 }
1569 auto& socket = info.channel->GetSocket();
1570 // If we have pending events from a previous iteration,
1571 // we do receive in any case.
1572 // Otherwise we check if there is any pending event and skip
1573 // this channel in case there is none.
1574 if (info.hasPendingEvents == 0) {
1575 socket.Events(&info.hasPendingEvents);
1576 // If we do not read, we can continue.
1577 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1578 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", info.channel->GetName().c_str());
1579 continue;
1580 }
1581 }
1582 // We can reset this, because it means we have seen at least 1
1583 // message after the UV_READABLE was raised.
1584 info.readPolled = false;
1585 // Notice that there seems to be a difference between the documentation
1586 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1587 // is raised does not mean that a message is immediately available to
1588 // read, just that it will be available soon, so the receive can
1589 // still return -2. To avoid this we keep receiving on the socket until
1590 // we get a message. In order not to overflow the DPL queue we process
1591 // one message at the time and we keep track of wether there were more
1592 // to process.
1593 bool newMessages = false;
1594 while (true) {
1595 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1596 info.channel->GetName().c_str(), info.id.value, info.oldestForChannel.value);
1597 if (info.parts.Size() < 64) {
1598 fair::mq::Parts parts;
1599 info.channel->Receive(parts, 0);
1600 if (parts.Size()) {
1601 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), info.channel->GetName().c_str(), info.id.value);
1602 }
1603 for (auto&& part : parts) {
1604 info.parts.fParts.emplace_back(std::move(part));
1605 }
1606 newMessages |= true;
1607 }
1608
1609 if (info.parts.Size() >= 0) {
1611 // Receiving data counts as activity now, so that
1612 // We can make sure we process all the pending
1613 // messages without hanging on the uv_run.
1614 break;
1615 }
1616 }
1617 // We check once again for pending events, keeping track if this was the
1618 // case so that we can immediately repeat this loop and avoid remaining
1619 // stuck in uv_run. This is because we will not get notified on the socket
1620 // if more events are pending due to zeromq level triggered approach.
1621 socket.Events(&info.hasPendingEvents);
1622 if (info.hasPendingEvents) {
1623 info.readPolled = false;
1624 // In case there were messages, we consider it as activity
1625 if (newMessages) {
1626 state.lastActiveDataProcessor.store(&context);
1627 }
1628 }
1629 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1630 info.channel->GetName().c_str(), info.id.value);
1631 }
1632}
1633
1635{
1636 auto& context = ref.get<DataProcessorContext>();
1637 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1638 auto& state = ref.get<DeviceState>();
1639 auto& spec = ref.get<DeviceSpec const>();
1640
1641 if (state.streaming == StreamingState::Idle) {
1642 return;
1643 }
1644
1645 context.completed.clear();
1646 context.completed.reserve(16);
1647 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1648 state.lastActiveDataProcessor.store(&context);
1649 }
1650 DanglingContext danglingContext{*context.registry};
1651
1652 context.preDanglingCallbacks(danglingContext);
1653 if (state.lastActiveDataProcessor.load() == nullptr) {
1654 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1655 }
1656 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1657 if (activity.expiredSlots > 0) {
1658 state.lastActiveDataProcessor = &context;
1659 }
1660
1661 context.completed.clear();
1662 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1663 state.lastActiveDataProcessor = &context;
1664 }
1665
1666 context.postDanglingCallbacks(danglingContext);
1667
1668 // If we got notified that all the sources are done, we call the EndOfStream
1669 // callback and return false. Notice that what happens next is actually
1670 // dependent on the callback, not something which is controlled by the
1671 // framework itself.
1672 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1674 state.lastActiveDataProcessor = &context;
1675 }
1676
1677 if (state.streaming == StreamingState::EndOfStreaming) {
1678 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1679 // We keep processing data until we are Idle.
1680 // FIXME: not sure this is the correct way to drain the queues, but
1681 // I guess we will see.
1684 auto& relayer = ref.get<DataRelayer>();
1685
1686 bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
1687
1688 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1689 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1690 }
1691
1692 auto& timingInfo = ref.get<TimingInfo>();
1693 // We should keep the data generated at end of stream only for those
1694 // which are not sources.
1695 timingInfo.keepAtEndOfStream = shouldProcess;
1696 // Fill timinginfo with some reasonable values for data sent with endOfStream
1697 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1698 timingInfo.tfCounter = -1;
1699 timingInfo.firstTForbit = -1;
1700 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1701 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1702 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1703
1704 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1705
1706 context.preEOSCallbacks(eosContext);
1707 auto& streamContext = ref.get<StreamContext>();
1708 streamContext.preEOSCallbacks(eosContext);
1709 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1710 streamContext.postEOSCallbacks(eosContext);
1711 context.postEOSCallbacks(eosContext);
1712
1713 for (auto& channel : spec.outputChannels) {
1714 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1716 }
1717 // This is needed because the transport is deleted before the device.
1718 relayer.clear();
1720 // In case we should process, note the data processor responsible for it
1721 if (shouldProcess) {
1722 state.lastActiveDataProcessor = &context;
1723 }
1724 // On end of stream we shut down all output pollers.
1725 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1726 for (auto& poller : state.activeOutputPollers) {
1727 uv_poll_stop(poller);
1728 }
1729 return;
1730 }
1731
1732 if (state.streaming == StreamingState::Idle) {
1733 // On end of stream we shut down all output pollers.
1734 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1735 for (auto& poller : state.activeOutputPollers) {
1736 uv_poll_stop(poller);
1737 }
1738 }
1739
1740 return;
1741}
1742
1744{
1745 ServiceRegistryRef ref{mServiceRegistry};
1746 ref.get<DataRelayer>().clear();
1747 auto& deviceContext = ref.get<DeviceContext>();
1748 // If the signal handler is there, we should
1749 // hide the registry from it, so that we do not
1750 // end up calling the signal handler on something
1751 // which might not be there anymore.
1752 if (deviceContext.sigusr1Handle) {
1753 deviceContext.sigusr1Handle->data = nullptr;
1754 }
1755 // Makes sure we do not have a working context on
1756 // shutdown.
1757 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1758 handle->data = nullptr;
1759 }
1760}
1761
1764 {
1765 }
1766};
1767
1768auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) -> void
1769{
1770 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
1771
1772 auto& spec = ref.get<DeviceSpec const>();
1773 auto& context = ref.get<DataProcessorContext>();
1774 if (context.forwardPolicy == ForwardPolicy::AfterProcessing || spec.forwards.empty()) {
1775 O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed.");
1776 return;
1777 }
1778
1779 O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding before injecting data into relayer.");
1780 auto& timesliceIndex = ref.get<TimesliceIndex>();
1781 auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();
1782
1783 auto& proxy = ref.get<FairMQDeviceProxy>();
1784
1785 O2_SIGNPOST_START(forwarding, sid, "forwardInputs",
1786 "Starting forwarding for incoming messages with oldestTimeslice %zu with copy",
1787 oldestTimeslice.timeslice.value);
1788 std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());
1789 DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false);
1790
1791 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
1792 if (forwardedParts[fi].Size() == 0) {
1793 continue;
1794 }
1795 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
1796 auto& parts = forwardedParts[fi];
1797 if (info.policy == nullptr) {
1798 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
1799 continue;
1800 }
1801 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
1802 info.policy->forward(parts, ChannelIndex{fi}, ref);
1803 }
1804 auto& asyncQueue = ref.get<AsyncQueue>();
1805 auto& decongestion = ref.get<DecongestionService>();
1806 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
1807 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
1808 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
1809 .user<DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1810 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
1811};
1812
1818{
1819 using InputInfo = DataRelayer::InputInfo;
1821
1822 auto& context = ref.get<DataProcessorContext>();
1823 // This is the same id as the upper level function, so we get the events
1824 // associated with the same interval. We will simply use "handle_data" as
1825 // the category.
1826 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1827
1828 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1829 // and we do a few stats. We bind parts as a lambda captured variable, rather
1830 // than an input, because we do not want the outer loop actually be exposed
1831 // to the implementation details of the messaging layer.
1832 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1833 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1834 auto ref = ServiceRegistryRef{*context.registry};
1835 auto& stats = ref.get<DataProcessingStats>();
1836 auto& state = ref.get<DeviceState>();
1837 auto& parts = info.parts;
1838 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1839
1840 std::vector<InputInfo> results;
1841 // we can reserve the upper limit
1842 results.reserve(parts.Size() / 2);
1843 size_t nTotalPayloads = 0;
1844
1845 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1846 results.emplace_back(position, length, type, index);
1847 if (type != InputType::Invalid && length > 1) {
1848 nTotalPayloads += length - 1;
1849 }
1850 };
1851
1852 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1853 auto* headerData = parts.At(pi)->GetData();
1854 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1855 auto dh = o2::header::get<DataHeader*>(headerData);
1856 if (sih) {
1857 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1858 info.state = sih->state;
1859 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1860 state.lastActiveDataProcessor = &context;
1861 if (dh) {
1862 LOGP(error, "Found data attached to a SourceInfoHeader");
1863 }
1864 continue;
1865 }
1866 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1867 if (dih) {
1868 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1869 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1870 state.lastActiveDataProcessor = &context;
1871 if (dh) {
1872 LOGP(error, "Found data attached to a DomainInfoHeader");
1873 }
1874 continue;
1875 }
1876 if (!dh) {
1877 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1878 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1879 continue;
1880 }
1881 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1882 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1883 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1884 continue;
1885 }
1886 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1887 // We only deal with the tracking of parts if the log is enabled.
1888 // This is because in principle we should track the size of each of
1889 // the parts and sum it up. Not for now.
1890 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1891 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1892 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1893 if (!dph) {
1894 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1895 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1896 continue;
1897 }
1898 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1899 // this is indicating a sequence of payloads following the header
1900 // FIXME: we will probably also set the DataHeader version
1901 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1902 pi += dh->splitPayloadParts - 1;
1903 } else {
1904 // We can set the type for the next splitPayloadParts
1905 // because we are guaranteed they are all the same.
1906 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1907 // pair.
1908 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1909 if (finalSplitPayloadIndex > parts.Size()) {
1910 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1911 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1912 continue;
1913 }
1914 insertInputInfo(pi, 2, InputType::Data, info.id);
1915 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1916 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1917 }
1918 }
1919 }
1920 if (results.size() + nTotalPayloads != parts.Size()) {
1921 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1922 return std::nullopt;
1923 }
1924 return results;
1925 };
1926
1927 auto reportError = [ref](const char* message) {
1928 auto& stats = ref.get<DataProcessingStats>();
1930 };
1931
1932 auto handleValidMessages = [&info, ref, &reportError, &context](std::vector<InputInfo> const& inputInfos) {
1933 auto& relayer = ref.get<DataRelayer>();
1934 auto& state = ref.get<DeviceState>();
1935 static WaitBackpressurePolicy policy;
1936 auto& parts = info.parts;
1937 // We relay execution to make sure we have a complete set of parts
1938 // available.
1939 bool hasBackpressure = false;
1940 size_t minBackpressureTimeslice = -1;
1941 bool hasData = false;
1942 size_t oldestPossibleTimeslice = -1;
1943 static std::vector<int> ordering;
1944 // Same as inputInfos but with iota.
1945 ordering.resize(inputInfos.size());
1946 std::iota(ordering.begin(), ordering.end(), 0);
1947 // stable sort orderings by type and position
1948 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
1949 auto const& ai = inputInfos[a];
1950 auto const& bi = inputInfos[b];
1951 if (ai.type != bi.type) {
1952 return ai.type < bi.type;
1953 }
1954 return ai.position < bi.position;
1955 });
1956 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
1957 auto const& input = inputInfos[ordering[ii]];
1958 switch (input.type) {
1959 case InputType::Data: {
1960 hasData = true;
1961 auto headerIndex = input.position;
1962 auto nMessages = 0;
1963 auto nPayloadsPerHeader = 0;
1964 if (input.size > 2) {
1965 // header and multiple payload sequence
1966 nMessages = input.size;
1967 nPayloadsPerHeader = nMessages - 1;
1968 } else {
1969 // multiple header-payload pairs
1970 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
1971 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
1972 nPayloadsPerHeader = 1;
1973 ii += (nMessages / 2) - 1;
1974 }
1975 auto onDrop = [ref](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1976 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
1977 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
1978 slot.index, oldestOutputInfo.timeslice.value);
1979 ref.get<AsyncQueue>();
1980 ref.get<DecongestionService>();
1981 ref.get<DataRelayer>();
1982 // Get the current timeslice for the slot.
1983 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1985 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
1986 };
1987
1988 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
1989 &parts.At(headerIndex),
1990 input,
1991 nMessages,
1992 nPayloadsPerHeader,
1993 context.forwardPolicy == ForwardPolicy::AtInjection ? forwardOnInsertion : nullptr,
1994 onDrop);
1995 switch (relayed.type) {
1997 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
1998 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
1999 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2000 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
2001 info.backpressureNotified = true;
2002 info.normalOpsNotified = false;
2003 }
2004 policy.backpressure(info);
2005 hasBackpressure = true;
2006 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2007 break;
2011 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2012 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2013 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2014 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2015 info.normalOpsNotified = true;
2016 info.backpressureNotified = false;
2017 }
2018 break;
2019 }
2020 } break;
2021 case InputType::SourceInfo: {
2022 LOGP(detail, "Received SourceInfo");
2023 auto& context = ref.get<DataProcessorContext>();
2024 state.lastActiveDataProcessor = &context;
2025 auto headerIndex = input.position;
2026 auto payloadIndex = input.position + 1;
2027 assert(payloadIndex < parts.Size());
2028 // FIXME: the message with the end of stream cannot contain
2029 // split parts.
2030 parts.At(headerIndex).reset(nullptr);
2031 parts.At(payloadIndex).reset(nullptr);
2032 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2033 // parts.At(headerIndex + 1 + i).reset(nullptr);
2034 // }
2035 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2036
2037 } break;
2038 case InputType::DomainInfo: {
2041 auto& context = ref.get<DataProcessorContext>();
2042 state.lastActiveDataProcessor = &context;
2043 auto headerIndex = input.position;
2044 auto payloadIndex = input.position + 1;
2045 assert(payloadIndex < parts.Size());
2046 // FIXME: the message with the end of stream cannot contain
2047 // split parts.
2048
2049 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2050 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2051 break;
2052 }
2053 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2054 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2055 parts.At(headerIndex).reset(nullptr);
2056 parts.At(payloadIndex).reset(nullptr);
2057 } break;
2058 case InputType::Invalid: {
2059 reportError("Invalid part found.");
2060 } break;
2061 }
2062 }
2065 if (oldestPossibleTimeslice != (size_t)-1) {
2066 info.oldestForChannel = {oldestPossibleTimeslice};
2067 auto& context = ref.get<DataProcessorContext>();
2068 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2069 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2070 state.lastActiveDataProcessor = &context;
2071 }
2072 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2073 parts.fParts.erase(it, parts.end());
2074 if (parts.fParts.size()) {
2075 LOG(debug) << parts.fParts.size() << " messages backpressured";
2076 }
2077 };
2078
2079 // Second part. This is the actual outer loop we want to obtain, with
2080 // implementation details which can be read. Notice how most of the state
2081 // is actually hidden. For example we do not expose what "input" is. This
2082 // will allow us to keep the same toplevel logic even if the actual meaning
2083 // of input is changed (for example we might move away from multipart
2084 // messages). Notice also that we need to act diffently depending on the
2085 // actual CompletionOp we want to perform. In particular forwarding inputs
2086 // also gets rid of them from the cache.
2087 auto inputTypes = getInputTypes();
2088 if (bool(inputTypes) == false) {
2089 reportError("Parts should come in couples. Dropping it.");
2090 return;
2091 }
2092 handleValidMessages(*inputTypes);
2093 return;
2094}
2095
2096namespace
2097{
2098struct InputLatency {
2099 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2100 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2101};
2102
2103auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2104{
2105 InputLatency result;
2106
2107 for (auto& item : record) {
2108 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2109 if (header == nullptr) {
2110 continue;
2111 }
2112 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2113 if (partLatency < 0) {
2114 partLatency = 0;
2115 }
2116 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2117 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2118 }
2119 return result;
2120};
2121
2122auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2123{
2124 size_t totalInputSize = 0;
2125 for (auto& item : record) {
2126 auto* header = o2::header::get<DataHeader*>(item.header);
2127 if (header == nullptr) {
2128 continue;
2129 }
2130 totalInputSize += header->payloadSize;
2131 }
2132 return totalInputSize;
2133};
2134
2135template <typename T>
2136void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2137{
2138 T prev_value = maximum_value;
2139 while (prev_value < value &&
2140 !maximum_value.compare_exchange_weak(prev_value, value)) {
2141 }
2142}
2143} // namespace
2144
2145bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2146{
2147 auto& context = ref.get<DataProcessorContext>();
2148 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2149 // This is the actual hidden state for the outer loop. In case we decide we
2150 // want to support multithreaded dispatching of operations, I can simply
2151 // move these to some thread local store and the rest of the lambdas
2152 // should work just fine.
2153 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
2154
2155 //
2156 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2157 auto& relayer = ref.get<DataRelayer>();
2158 if (consume) {
2159 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2160 } else {
2161 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2162 }
2163 // Convert raw message indices directly to a DataRef in O(1).
2164 // Used both by the sequential PartIterator and as the fallback for positional access.
2165 auto indicesGetter = [&currentSetOfInputs](size_t i, DataRefIndices indices) -> DataRef {
2166 auto const& msgs = currentSetOfInputs[i];
2167 if (msgs.size() <= indices.headerIdx) {
2168 return DataRef{};
2169 }
2170 auto const& headerMsg = msgs[indices.headerIdx];
2171 char const* payloadData = nullptr;
2172 size_t payloadSize = 0;
2173 if (msgs.size() > indices.payloadIdx && msgs[indices.payloadIdx]) {
2174 payloadData = static_cast<char const*>(msgs[indices.payloadIdx]->GetData());
2175 payloadSize = msgs[indices.payloadIdx]->GetSize();
2176 }
2177 return DataRef{nullptr,
2178 headerMsg ? static_cast<char const*>(headerMsg->GetData()) : nullptr,
2179 payloadData,
2180 payloadSize};
2181 };
2182 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2183 return (currentSetOfInputs[i] | count_payloads{});
2184 };
2185 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2186 auto& header = static_cast<const fair::mq::shmem::Message&>(*(currentSetOfInputs[idx] | get_header{0}));
2187 return header.GetRefCount();
2188 };
2189 auto nextIndicesGetter = [&currentSetOfInputs](size_t i, DataRefIndices current) -> DataRefIndices {
2190 auto next = currentSetOfInputs[i] | get_next_pair{current};
2191 return next.headerIdx < currentSetOfInputs[i].size() ? next : DataRefIndices{size_t(-1), size_t(-1)};
2192 };
2193 return InputSpan{nofPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter, currentSetOfInputs.size()};
2194 };
2195
2196 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2197 auto& relayer = ref.get<DataRelayer>();
2199 };
2200
2201 // I need a preparation step which gets the current timeslice id and
2202 // propagates it to the various contextes (i.e. the actual entities which
2203 // create messages) because the messages need to have the timeslice id into
2204 // it.
2205 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2206 auto& relayer = ref.get<DataRelayer>();
2207 auto& timingInfo = ref.get<TimingInfo>();
2208 auto timeslice = relayer.getTimesliceForSlot(i);
2209
2210 timingInfo.timeslice = timeslice.value;
2211 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2212 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2213 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2214 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2215 };
2216 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2217 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2218 auto& relayer = ref.get<DataRelayer>();
2219 auto& timingInfo = ref.get<TimingInfo>();
2220 auto timeslice = relayer.getTimesliceForSlot(i);
2221 // We report wether or not this timing info refers to a new Run.
2222 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2223 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2224 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2225 // FIXME: for now there is only one stream, however we
2226 // should calculate this correctly once we finally get the
2227 // the StreamContext in.
2228 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2229 };
2230
2231 // When processing them, timers will have to be cleaned up
2232 // to avoid double counting them.
2233 // This was actually the easiest solution we could find for
2234 // O2-646.
2235 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2236 assert(record.size() == currentSetOfInputs.size());
2237 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2238 // assuming that for timer inputs we do have exactly one PartRef object
2239 // in the MessageSet, multiple PartRef Objects are only possible for either
2240 // split payload messages of wildcard matchers, both for data inputs
2241 DataRef input = record.getByPos(ii);
2242 if (input.spec->lifetime != Lifetime::Timer) {
2243 continue;
2244 }
2245 if (input.header == nullptr) {
2246 continue;
2247 }
2248 // This will hopefully delete the message.
2249 currentSetOfInputs[ii].clear();
2250 }
2251 };
2252
2253 // Function to cleanup record. For the moment we
2254 // simply use it to keep track of input messages
2255 // which are not needed, to display them in the GUI.
2256 auto cleanupRecord = [](InputRecord& record) {
2257 if (O2_LOG_ENABLED(parts) == false) {
2258 return;
2259 }
2260 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2261 DataRef input = record.getByPos(pi);
2262 if (input.header == nullptr) {
2263 continue;
2264 }
2265 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2266 if (sih) {
2267 continue;
2268 }
2269
2270 auto dh = o2::header::get<DataHeader*>(input.header);
2271 if (!dh) {
2272 continue;
2273 }
2274 // We use the address of the first header of a split payload
2275 // to identify the interval.
2276 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2277 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2278
2279 // No split parts, we simply skip the payload
2280 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2281 // this is indicating a sequence of payloads following the header
2282 // FIXME: we will probably also set the DataHeader version
2283 pi += dh->splitPayloadParts - 1;
2284 } else {
2285 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2286 }
2287 }
2288 };
2289
2290 ref.get<DataRelayer>().getReadyToProcess(completed);
2291 if (completed.empty() == true) {
2292 LOGP(debug, "No computations available for dispatching.");
2293 return false;
2294 }
2295
2296 int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get<RawDeviceService>().device()->fConfig);
2297
2298 auto postUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2299 auto& stats = ref.get<DataProcessingStats>();
2300 auto& states = ref.get<DataProcessingStates>();
2301 std::atomic_thread_fence(std::memory_order_release);
2302 char relayerSlotState[1024];
2303 int written = snprintf(relayerSlotState, 1024, "%d ", pipelineLength);
2304 char* buffer = relayerSlotState + written;
2305 for (size_t ai = 0; ai != record.size(); ai++) {
2306 buffer[ai] = record.isValid(ai) ? '3' : '0';
2307 }
2308 buffer[record.size()] = 0;
2309 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2310 .size = (int)(record.size() + buffer - relayerSlotState),
2311 .data = relayerSlotState});
2312 uint64_t tEnd = uv_hrtime();
2313 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2314 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2316 // Sum up the total wall time, in milliseconds.
2318 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2320 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2321 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2322 auto latency = calculateInputRecordLatency(record, tStartMilli);
2323 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2324 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2325 static int count = 0;
2327 count++;
2328 };
2329
2330 auto preUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2331 auto& states = ref.get<DataProcessingStates>();
2332 std::atomic_thread_fence(std::memory_order_release);
2333 char relayerSlotState[1024];
2334 snprintf(relayerSlotState, 1024, "%d ", pipelineLength);
2335 char* buffer = strchr(relayerSlotState, ' ') + 1;
2336 for (size_t ai = 0; ai != record.size(); ai++) {
2337 buffer[ai] = record.isValid(ai) ? '2' : '0';
2338 }
2339 buffer[record.size()] = 0;
2340 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2341 };
2342
2343 // This is the main dispatching loop
2344 auto& state = ref.get<DeviceState>();
2345 auto& spec = ref.get<DeviceSpec const>();
2346
2347 auto& dpContext = ref.get<DataProcessorContext>();
2348 auto& streamContext = ref.get<StreamContext>();
2349 O2_SIGNPOST_ID_GENERATE(sid, device);
2350 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2351
2352 auto& stats = ref.get<DataProcessingStats>();
2353 auto& relayer = ref.get<DataRelayer>();
2354 using namespace o2::framework;
2355 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2356 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2357 switch (spec.completionPolicy.order) {
2359 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2360 break;
2362 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2363 break;
2365 default:
2366 break;
2367 }
2368
2369 for (auto action : completed) {
2370 O2_SIGNPOST_ID_GENERATE(aid, device);
2371 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2372 if (action.op == CompletionPolicy::CompletionOp::Wait) {
2373 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2374 continue;
2375 }
2376
2377 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2379 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2380 if (action.op != CompletionPolicy::CompletionOp::Discard &&
2383 updateRunInformation(TimesliceSlot{action.slot});
2384 }
2385 InputSpan span = getInputSpan(action.slot, shouldConsume);
2386 auto& spec = ref.get<DeviceSpec const>();
2387 InputRecord record{spec.inputs,
2388 span,
2389 *context.registry};
2390 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2391 {
2392 // Notice this should be thread safe and reentrant
2393 // as it is called from many threads.
2394 streamContext.preProcessingCallbacks(processContext);
2395 dpContext.preProcessingCallbacks(processContext);
2396 }
2397 if (action.op == CompletionPolicy::CompletionOp::Discard) {
2398 context.postDispatchingCallbacks(processContext);
2399 if (spec.forwards.empty() == false) {
2400 auto& timesliceIndex = ref.get<TimesliceIndex>();
2401 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2402 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2403 continue;
2404 }
2405 }
2406 // If there is no optional inputs we canForwardEarly
2407 // the messages to that parallel processing can happen.
2408 // In this case we pass true to indicate that we want to
2409 // copy the messages to the subsequent data processor.
2410 bool hasForwards = spec.forwards.empty() == false;
2411 bool consumeSomething = action.op == CompletionPolicy::CompletionOp::Consume || action.op == CompletionPolicy::CompletionOp::ConsumeExisting;
2412
2413 if (context.forwardPolicy == ForwardPolicy::AtCompletionPolicySatisified && hasForwards && consumeSomething) {
2414 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
2415 auto& timesliceIndex = ref.get<TimesliceIndex>();
2416 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2417 } else if (context.forwardPolicy == ForwardPolicy::AtInjection && hasForwards && consumeSomething) {
2418 // We used to do fowarding here, however we now do it much earlier.
2419 // We still need to clean the inputs which were already consumed
2420 // via ConsumeExisting and which still have an header to hold the slot.
2421 // FIXME: do we? This should really happen when we do the forwarding on
2422 // insertion, because otherwise we lose the relevant information on how to
2423 // navigate the set of headers. We could actually rely on the messageset index,
2424 // is that the right thing to do though?
2425 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "cleaning early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
2426 auto& timesliceIndex = ref.get<TimesliceIndex>();
2427 cleanEarlyForward(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2428 }
2429
2430 markInputsAsDone(action.slot);
2431
2432 uint64_t tStart = uv_hrtime();
2433 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2434 preUpdateStats(action, record, tStart);
2435
2436 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2437
2438 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2439 auto& state = ref.get<DeviceState>();
2440 auto& spec = ref.get<DeviceSpec const>();
2441 auto& streamContext = ref.get<StreamContext>();
2442 auto& dpContext = ref.get<DataProcessorContext>();
2443 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2444 switch (action.op) {
2449 return true;
2450 break;
2451 default:
2452 return false;
2453 }
2454 };
2455 if (state.quitRequested == false) {
2456 {
2457 // Callbacks from services
2458 dpContext.preProcessingCallbacks(processContext);
2459 streamContext.preProcessingCallbacks(processContext);
2460 dpContext.preProcessingCallbacks(processContext);
2461 // Callbacks from users
2462 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2463 }
2464 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2465 if (context.statefulProcess && shouldProcess(action)) {
2466 // This way, usercode can use the the same processing context to identify
2467 // its signposts and we can map user code to device iterations.
2468 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2469 (context.statefulProcess)(processContext);
2470 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2471 } else if (context.statelessProcess && shouldProcess(action)) {
2472 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2473 (context.statelessProcess)(processContext);
2474 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2475 } else if (context.statelessProcess || context.statefulProcess) {
2476 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2477 } else {
2478 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2480 }
2481 if (shouldProcess(action)) {
2482 auto& timingInfo = ref.get<TimingInfo>();
2483 if (timingInfo.globalRunNumberChanged) {
2484 context.lastRunNumberProcessed = timingInfo.runNumber;
2485 }
2486 }
2487
2488 // Notify the sink we just consumed some timeframe data
2489 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2490 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2491 auto& allocator = ref.get<DataAllocator>();
2492 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2493 }
2494
2495 // Extra callback which allows a service to add extra outputs.
2496 // This is needed e.g. to ensure that injected CCDB outputs are added
2497 // before an end of stream.
2498 {
2499 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2500 dpContext.finaliseOutputsCallbacks(processContext);
2501 streamContext.finaliseOutputsCallbacks(processContext);
2502 }
2503
2504 {
2505 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2506 dpContext.postProcessingCallbacks(processContext);
2507 streamContext.postProcessingCallbacks(processContext);
2508 }
2509 }
2510 };
2511
2512 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2513 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2514 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2515 }
2516 if (noCatch) {
2517 try {
2518 runNoCatch(action);
2519 } catch (o2::framework::RuntimeErrorRef e) {
2520 (context.errorHandling)(e, record);
2521 }
2522 } else {
2523 try {
2524 runNoCatch(action);
2525 } catch (std::exception& ex) {
2529 auto e = runtime_error(ex.what());
2530 (context.errorHandling)(e, record);
2531 } catch (o2::framework::RuntimeErrorRef e) {
2532 (context.errorHandling)(e, record);
2533 }
2534 }
2535 if (state.severityStack.empty() == false) {
2536 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2537 state.severityStack.pop_back();
2538 }
2539
2540 postUpdateStats(action, record, tStart, tStartMilli);
2541 // We forward inputs only when we consume them. If we simply Process them,
2542 // we keep them for next message arriving.
2543 if (action.op == CompletionPolicy::CompletionOp::Consume) {
2544 cleanupRecord(record);
2545 context.postDispatchingCallbacks(processContext);
2546 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2547 }
2548 if ((context.forwardPolicy == ForwardPolicy::AfterProcessing) && hasForwards && consumeSomething) {
2549 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2550 auto& timesliceIndex = ref.get<TimesliceIndex>();
2551 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2552 }
2553 context.postForwardingCallbacks(processContext);
2554 if (action.op == CompletionPolicy::CompletionOp::Process) {
2555 cleanTimers(action.slot, record);
2556 }
2557 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2558 }
2559 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2560
2561 // We now broadcast the end of stream if it was requested
2562 if (state.streaming == StreamingState::EndOfStreaming) {
2563 LOGP(detail, "Broadcasting end of stream");
2564 for (auto& channel : spec.outputChannels) {
2566 }
2568 }
2569
2570 return true;
2571}
2572
2574{
2575 LOG(error) << msg;
2576 ServiceRegistryRef ref{mServiceRegistry};
2577 auto& stats = ref.get<DataProcessingStats>();
2579}
2580
2581std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2582{
2583
2584 if (registry.active<ConfigurationInterface>()) {
2585 auto& cfg = registry.get<ConfigurationInterface>();
2586 try {
2587 cfg.getRecursive(name);
2588 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2589 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2590 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2591 configStore->preload();
2592 configStore->activate();
2593 return configStore;
2594 } catch (...) {
2595 // No overrides...
2596 }
2597 }
2598 return {nullptr};
2599}
2600
2601} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
std::ostringstream debug
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:600
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:554
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:506
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
#define O2_LOG_ENABLED(log)
Definition Signpost.h:111
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:507
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:564
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:523
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
constexpr uint32_t runtime_hash(char const *str)
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< std::vector< fair::mq::MessagePtr > > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:93
virtual fair::mq::Device * device()=0
bool active() const
Check if service of type T is currently active.
OldestOutputInfo getOldestPossibleOutput() const
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLsizei GLenum const void * indices
Definition glcorearb.h:400
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void run_callback(uv_work_t *handle)
auto forwardOnInsertion(ServiceRegistryRef &ref, std::span< fair::mq::MessagePtr > &messages) -> void
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void cleanForwardedMessages(std::span< fair::mq::MessagePtr > &currentSetOfInputs, bool consume)
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< std::vector< fair::mq::MessagePtr > > &currentSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > &currentSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
ForwardPolicy forwardPolicy
Wether or not the associated DataProcessor can forward things early.
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:28
const InputSpec * spec
Definition DataRef.h:27
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
fair::mq::Channel * channel
Definition ChannelInfo.h:51
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:620
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153