Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <atomic>
33#include "Framework/InputSpan.h"
34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
36#endif
37#include "Framework/Signpost.h"
50
51#include "DecongestionService.h"
53#include "DataRelayerHelpers.h"
54#include "Headers/DataHeader.h"
56
57#include <Framework/Tracing.h>
58
59#include <fairmq/Parts.h>
60#include <fairmq/Socket.h>
61#include <fairmq/ProgOptions.h>
62#include <fairmq/shmem/Message.h>
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// A log to use for general device logging
91// Special log to keep track of the lifetime of the parts
93// Stream which keeps track of the calibration lifetime logic
95// Special log to track the async queue behavior
97// Special log to track the forwarding requests
99// Special log to track CCDB related requests
101// Special log to track task scheduling
103
104using namespace o2::framework;
105using ConfigurationInterface = o2::configuration::ConfigurationInterface;
107
108constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
109
110namespace o2::framework
111{
112
113template <>
117
121{
122 auto* state = (DeviceState*)handle->data;
123 state->loopReason |= DeviceState::TIMER_EXPIRED;
124}
125
127{
128 auto* state = (DeviceState*)s->data;
130}
131
132DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
133{
134 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
135 return devices[running.index];
136}
137
143
145 : mRunningDevice{running},
146 mConfigRegistry{nullptr},
147 mServiceRegistry{registry}
148{
149 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
150 if (key == "cleanup") {
152 auto& deviceState = ref.get<DeviceState>();
153 int64_t cleanupCount = deviceState.cleanupCount.load();
154 int64_t newCleanupCount = std::stoll(value);
155 if (newCleanupCount <= cleanupCount) {
156 return;
157 }
158 deviceState.cleanupCount.store(newCleanupCount);
159 for (auto& info : deviceState.inputChannelInfos) {
160 fair::mq::Parts parts;
161 while (info.channel->Receive(parts, 0)) {
162 LOGP(debug, "Dropping {} parts", parts.Size());
163 if (parts.Size() == 0) {
164 break;
165 }
166 }
167 }
168 }
169 });
170
171 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
173 auto& deviceState = ref.get<DeviceState>();
174 auto& control = ref.get<ControlService>();
175 auto& callbacks = ref.get<CallbackService>();
176 control.notifyDeviceState(fair::mq::GetStateName(state));
178
179 if (deviceState.nextFairMQState.empty() == false) {
180 auto state = deviceState.nextFairMQState.back();
181 (void)this->ChangeState(state);
182 deviceState.nextFairMQState.pop_back();
183 }
184 };
185
186 // 99 is to execute DPL callbacks last
187 this->SubscribeToStateChange("99-dpl", stateWatcher);
188
189 // One task for now.
190 mStreams.resize(1);
191 mHandles.resize(1);
192
193 ServiceRegistryRef ref{mServiceRegistry};
194
195 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
196 auto& state = ref.get<DeviceState>();
197 assert(state.loop);
198 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
199 mAwakeHandle->data = &state;
200 if (res < 0) {
201 LOG(error) << "Unable to initialise subscription";
202 }
203
205 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
206 int res = uv_async_send(wakeHandle);
207 if (res < 0) {
208 LOG(error) << "Unable to notify subscription";
209 }
210 LOG(debug) << "State transition requested";
211 });
212}
213
214// Callback to execute the processing. Notice how the data is
215// is a vector of DataProcessorContext so that we can index the correct
216// one with the thread id. For the moment we simply use the first one.
217void run_callback(uv_work_t* handle)
218{
219 auto* task = (TaskStreamInfo*)handle->data;
220 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
221 // We create a new signpost interval for this specific data processor. Same id, same data processor.
222 auto& dataProcessorContext = ref.get<DataProcessorContext>();
223 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
224 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
227 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
228}
229
230// Once the processing in a thread is done, this is executed on the main thread.
231void run_completion(uv_work_t* handle, int status)
232{
233 auto* task = (TaskStreamInfo*)handle->data;
234 // Notice that the completion, while running on the main thread, still
235 // has a salt which is associated to the actual stream which was doing the computation
236 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
237 auto& state = ref.get<DeviceState>();
238 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
239
240 using o2::monitoring::Metric;
241 using o2::monitoring::Monitoring;
242 using o2::monitoring::tags::Key;
243 using o2::monitoring::tags::Value;
244
245 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
246 auto& dpStats = ref.get<DataProcessingStats>();
247 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
248 // For now we give back the offer if we did not use it completely.
249 // In principle we should try to run until the offer is fully consumed.
250 stats.totalConsumedTimeslices += std::min<int64_t>(accumulatedConsumed.timeslices, 1);
251
252 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
253 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedTimeslices});
254 dpStats.processCommandQueue();
255 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
256 assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]);
257 };
258
259 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
260 auto& dpStats = ref.get<DataProcessingStats>();
261 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
262 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
263 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
264 dpStats.processCommandQueue();
265 };
266
267 for (auto& consumer : state.offerConsumers) {
268 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
269 }
270 state.offerConsumers.clear();
271 quotaEvaluator.handleExpired(reportExpiredOffer);
272 quotaEvaluator.dispose(task->id.index);
273 task->running = false;
274}
275
276// Context for polling
278 enum struct PollerState : char { Stopped,
280 Connected,
281 Suspended };
282 char const* name = nullptr;
283 uv_loop_t* loop = nullptr;
285 DeviceState* state = nullptr;
286 fair::mq::Socket* socket = nullptr;
288 int fd = -1;
289 bool read = true;
291};
292
293void on_socket_polled(uv_poll_t* poller, int status, int events)
294{
295 auto* context = (PollerContext*)poller->data;
296 assert(context);
297 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
298 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
299 switch (events) {
300 case UV_READABLE: {
301 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
302 context->state->loopReason |= DeviceState::DATA_INCOMING;
303 } break;
304 case UV_WRITABLE: {
305 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
306 if (context->read) {
307 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
308 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
309 context->state->loopReason |= DeviceState::DATA_CONNECTED;
310 } else {
311 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
312 context->state->loopReason |= DeviceState::DATA_OUTGOING;
313 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
314 // just wait for the disconnect.
315 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
316 }
317 context->pollerState = PollerContext::PollerState::Connected;
318 } break;
319 case UV_DISCONNECT: {
320 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
321 } break;
322 case UV_PRIORITIZED: {
323 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
324 } break;
325 }
326 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
327}
328
329void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
330{
331 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
332 auto* context = (PollerContext*)poller->data;
333 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
334 if (status < 0) {
335 LOGP(fatal, "Error while polling {}: {}", context->name, status);
336 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
337 }
338 switch (events) {
339 case UV_READABLE: {
340 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
341 context->state->loopReason |= DeviceState::DATA_INCOMING;
342 assert(context->channelInfo);
343 context->channelInfo->readPolled = true;
344 } break;
345 case UV_WRITABLE: {
346 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
347 if (context->read) {
348 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
349 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
350 } else {
351 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
352 context->state->loopReason |= DeviceState::DATA_OUTGOING;
353 }
354 } break;
355 case UV_DISCONNECT: {
356 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
357 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
358 } break;
359 case UV_PRIORITIZED: {
360 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
361 } break;
362 }
363 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
364}
365
374{
375 auto ref = ServiceRegistryRef{mServiceRegistry};
376 auto& context = ref.get<DataProcessorContext>();
377 auto& spec = getRunningDevice(mRunningDevice, ref);
378
379 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
380 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
381 context.statelessProcess = spec.algorithm.onProcess;
382 context.statefulProcess = nullptr;
383 context.error = spec.algorithm.onError;
384 context.initError = spec.algorithm.onInitError;
385
386 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
387 if (configStore == nullptr) {
388 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
389 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
390 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
391 configStore->preload();
392 configStore->activate();
393 }
394
395 using boost::property_tree::ptree;
396
398 for (auto& entry : configStore->store()) {
399 std::stringstream ss;
400 std::string str;
401 if (entry.second.empty() == false) {
402 boost::property_tree::json_parser::write_json(ss, entry.second, false);
403 str = ss.str();
404 str.pop_back(); // remove EoL
405 } else {
406 str = entry.second.get_value<std::string>();
407 }
408 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
409 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
410 }
411
412 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
413
414 // Setup the error handlers for init
415 if (context.initError) {
416 context.initErrorHandling = [&errorCallback = context.initError,
417 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
421 auto& context = ref.get<DataProcessorContext>();
422 auto& err = error_from_ref(e);
423 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
424 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
425 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
426 auto& stats = ref.get<DataProcessingStats>();
428 InitErrorContext errorContext{ref, e};
429 errorCallback(errorContext);
430 };
431 } else {
432 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
433 auto& err = error_from_ref(e);
437 auto& context = ref.get<DataProcessorContext>();
438 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
439 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
440 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
441 auto& stats = ref.get<DataProcessingStats>();
443 exit(1);
444 };
445 }
446
447 context.expirationHandlers.clear();
448 context.init = spec.algorithm.onInit;
449 if (context.init) {
450 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
451 InitContext initContext{*mConfigRegistry, mServiceRegistry};
452
453 if (noCatch) {
454 try {
455 context.statefulProcess = context.init(initContext);
457 if (context.initErrorHandling) {
458 (context.initErrorHandling)(e);
459 }
460 }
461 } else {
462 try {
463 context.statefulProcess = context.init(initContext);
464 } catch (std::exception& ex) {
468 auto e = runtime_error(ex.what());
469 (context.initErrorHandling)(e);
471 (context.initErrorHandling)(e);
472 }
473 }
474 }
475 auto& state = ref.get<DeviceState>();
476 state.inputChannelInfos.resize(spec.inputChannels.size());
480 int validChannelId = 0;
481 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
482 auto& name = spec.inputChannels[ci].name;
483 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
484 state.inputChannelInfos[ci].state = InputChannelState::Pull;
485 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
486 validChannelId++;
487 } else {
488 state.inputChannelInfos[ci].id = {validChannelId++};
489 }
490 }
491
492 // Invoke the callback policy for this device.
493 if (spec.callbacksPolicy.policy != nullptr) {
494 InitContext initContext{*mConfigRegistry, mServiceRegistry};
495 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
496 }
497
498 // Services which are stream should be initialised now
499 auto* options = GetConfig();
500 for (size_t si = 0; si < mStreams.size(); ++si) {
502 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
503 }
504 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
505}
506
507void on_signal_callback(uv_signal_t* handle, int signum)
508{
509 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
510 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
511
512 auto* registry = (ServiceRegistry*)handle->data;
513 if (!registry) {
514 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
515 return;
516 }
517 ServiceRegistryRef ref{*registry};
518 auto& state = ref.get<DeviceState>();
519 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
520 auto& stats = ref.get<DataProcessingStats>();
522 size_t ri = 0;
523 while (ri != quotaEvaluator.mOffers.size()) {
524 auto& offer = quotaEvaluator.mOffers[ri];
525 // We were already offered some sharedMemory, so we
526 // do not consider the offer.
527 // FIXME: in principle this should account for memory
528 // available and being offered, however we
529 // want to get out of the woods for now.
530 if (offer.valid && offer.sharedMemory != 0) {
531 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
532 return;
533 }
534 ri++;
535 }
536 // Find the first empty offer and have 1GB of shared memory there
537 for (auto& offer : quotaEvaluator.mOffers) {
538 if (offer.valid == false) {
539 offer.cpu = 0;
540 offer.memory = 0;
541 offer.sharedMemory = 1000000000;
542 offer.valid = true;
543 offer.user = -1;
544 break;
545 }
546 }
548 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
549}
550
551struct DecongestionContext {
554};
555
556auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
557 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
558 auto& ref = task.user<DecongestionContext>().ref;
559
560 auto& decongestion = ref.get<DecongestionService>();
561 auto& proxy = ref.get<FairMQDeviceProxy>();
562 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
563 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
564 return;
565 }
566 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
567 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
568 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
569 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
570 // TODO: this we could cache in the proxy at the bind moment.
571 if (info.channelType != ChannelAccountingType::DPL) {
572 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
573 info.name.c_str());
574
575 continue;
576 }
577 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
578 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
579 info.name.c_str(), oldestTimeslice.timeslice.value);
580 }
581 }
582};
583
584// This is how we do the forwarding, i.e. we push
585// the inputs which are shared between this device and others
586// to the next one in the daisy chain.
587// FIXME: do it in a smarter way than O(N^2)
588static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
589 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
590 auto& proxy = registry.get<FairMQDeviceProxy>();
591
592 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
593 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
594 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
595 auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copy, consume);
596
597 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
598 if (forwardedParts[fi].Size() == 0) {
599 continue;
600 }
601 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
602 auto& parts = forwardedParts[fi];
603 if (info.policy == nullptr) {
604 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
605 continue;
606 }
607 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
608 info.policy->forward(parts, ChannelIndex{fi}, registry);
609 }
610
611 auto& asyncQueue = registry.get<AsyncQueue>();
612 auto& decongestion = registry.get<DecongestionService>();
613 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
614 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
615 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
616 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
617 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
618};
619
620static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
621 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
622 auto& proxy = registry.get<FairMQDeviceProxy>();
623
624 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
625 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
626 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
627 // Always copy them, because we do not want to actually send them.
628 // We merely need the side effect of the consume, if applicable.
629 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
630 auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
632 }
633
634 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
635};
636
637extern volatile int region_read_global_dummy_variable;
639
641void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
642{
643 if (infos.empty() == false) {
644 std::vector<fair::mq::RegionInfo> toBeNotified;
645 toBeNotified.swap(infos); // avoid any MT issue.
646 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
647 for (auto const& info : toBeNotified) {
648 if (dummyRead) {
649 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
650 region_read_global_dummy_variable = ((int*)info.ptr)[i];
651 }
652 }
653 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
654 }
655 }
656}
657
658namespace
659{
661{
662 auto* state = (DeviceState*)handle->data;
664}
665} // namespace
666
667void DataProcessingDevice::initPollers()
668{
669 auto ref = ServiceRegistryRef{mServiceRegistry};
670 auto& deviceContext = ref.get<DeviceContext>();
671 auto& context = ref.get<DataProcessorContext>();
672 auto& spec = ref.get<DeviceSpec const>();
673 auto& state = ref.get<DeviceState>();
674 // We add a timer only in case a channel poller is not there.
675 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
676 for (auto& [channelName, channel] : GetChannels()) {
677 InputChannelInfo* channelInfo;
678 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
679 auto& channelSpec = spec.inputChannels[ci];
680 channelInfo = &state.inputChannelInfos[ci];
681 if (channelSpec.name != channelName) {
682 continue;
683 }
684 channelInfo->channel = &this->GetChannel(channelName, 0);
685 break;
686 }
687 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
688 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
689 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
690 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
691 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
692 continue;
693 }
694 // We only watch receiving sockets.
695 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
696 LOGP(detail, "{} is to send data. Not polling.", channelName);
697 continue;
698 }
699
700 if (channelName.rfind("from_", 0) != 0) {
701 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
702 continue;
703 }
704
705 // We assume there is always a ZeroMQ socket behind.
706 int zmq_fd = 0;
707 size_t zmq_fd_len = sizeof(zmq_fd);
708 // FIXME: I should probably save those somewhere... ;-)
709 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
710 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
711 if (zmq_fd == 0) {
712 LOG(error) << "Cannot get file descriptor for channel." << channelName;
713 continue;
714 }
715 LOGP(detail, "Polling socket for {}", channelName);
716 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
717 pCtx->name = strdup(channelName.c_str());
718 pCtx->loop = state.loop;
719 pCtx->device = this;
720 pCtx->state = &state;
721 pCtx->fd = zmq_fd;
722 assert(channelInfo != nullptr);
723 pCtx->channelInfo = channelInfo;
724 pCtx->socket = &channel[0].GetSocket();
725 pCtx->read = true;
726 poller->data = pCtx;
727 uv_poll_init(state.loop, poller, zmq_fd);
728 if (channelName.rfind("from_", 0) != 0) {
729 LOGP(detail, "{} is an out of band channel.", channelName);
730 state.activeOutOfBandPollers.push_back(poller);
731 } else {
732 channelInfo->pollerIndex = state.activeInputPollers.size();
733 state.activeInputPollers.push_back(poller);
734 }
735 }
736 // In case we do not have any input channel and we do not have
737 // any timers or signal watchers we still wake up whenever we can send data to downstream
738 // devices to allow for enumerations.
739 if (state.activeInputPollers.empty() &&
740 state.activeOutOfBandPollers.empty() &&
741 state.activeTimers.empty() &&
742 state.activeSignals.empty()) {
743 // FIXME: this is to make sure we do not reset the output timer
744 // for readout proxies or similar. In principle this should go once
745 // we move to OutOfBand InputSpec.
746 if (state.inputChannelInfos.empty()) {
747 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
748 deviceContext.exitTransitionTimeout = 0;
749 }
750 for (auto& [channelName, channel] : GetChannels()) {
751 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
752 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
753 continue;
754 }
755 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
756 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
757 continue;
758 }
759 // We assume there is always a ZeroMQ socket behind.
760 int zmq_fd = 0;
761 size_t zmq_fd_len = sizeof(zmq_fd);
762 // FIXME: I should probably save those somewhere... ;-)
763 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
764 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
765 if (zmq_fd == 0) {
766 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
767 continue;
768 }
769 LOG(detail) << "Polling socket for " << channel[0].GetName();
770 // FIXME: leak
771 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
772 pCtx->name = strdup(channelName.c_str());
773 pCtx->loop = state.loop;
774 pCtx->device = this;
775 pCtx->state = &state;
776 pCtx->fd = zmq_fd;
777 pCtx->read = false;
778 poller->data = pCtx;
779 uv_poll_init(state.loop, poller, zmq_fd);
780 state.activeOutputPollers.push_back(poller);
781 }
782 }
783 } else {
784 LOGP(detail, "This is a fake device so we exit after the first iteration.");
785 deviceContext.exitTransitionTimeout = 0;
786 // This is a fake device, so we can request to exit immediately
787 ServiceRegistryRef ref{mServiceRegistry};
788 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
789 // A two second timer to stop internal devices which do not want to
790 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
791 uv_timer_init(state.loop, timer);
792 timer->data = &state;
793 uv_update_time(state.loop);
794 uv_timer_start(timer, on_idle_timer, 2000, 2000);
795 state.activeTimers.push_back(timer);
796 }
797}
798
799void DataProcessingDevice::startPollers()
800{
801 auto ref = ServiceRegistryRef{mServiceRegistry};
802 auto& deviceContext = ref.get<DeviceContext>();
803 auto& state = ref.get<DeviceState>();
804
805 for (auto* poller : state.activeInputPollers) {
806 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
807 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
808 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
809 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
810 }
811 for (auto& poller : state.activeOutOfBandPollers) {
812 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
813 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
814 }
815 for (auto* poller : state.activeOutputPollers) {
816 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
817 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
818 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
819 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
820 }
821
822 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
823 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
824 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
825
826 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
827 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
828 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
829}
830
831void DataProcessingDevice::stopPollers()
832{
833 auto ref = ServiceRegistryRef{mServiceRegistry};
834 auto& deviceContext = ref.get<DeviceContext>();
835 auto& state = ref.get<DeviceState>();
836 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
837 for (auto* poller : state.activeInputPollers) {
838 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
839 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
840 uv_poll_stop(poller);
841 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
842 }
843 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
844 for (auto* poller : state.activeOutOfBandPollers) {
845 uv_poll_stop(poller);
846 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
847 }
848 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
849 for (auto* poller : state.activeOutputPollers) {
850 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
851 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
852 uv_poll_stop(poller);
853 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
854 }
855
856 uv_timer_stop(deviceContext.gracePeriodTimer);
857 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
858 free(deviceContext.gracePeriodTimer);
859 deviceContext.gracePeriodTimer = nullptr;
860
861 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
862 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
863 free(deviceContext.dataProcessingGracePeriodTimer);
864 deviceContext.dataProcessingGracePeriodTimer = nullptr;
865}
866
868{
869 auto ref = ServiceRegistryRef{mServiceRegistry};
870 auto& deviceContext = ref.get<DeviceContext>();
871 auto& context = ref.get<DataProcessorContext>();
872
873 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
874 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
875 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
876 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
877 auto& state = ref.get<DeviceState>();
878 int i = 0;
879 for (auto& di : distinct) {
880 auto& route = spec.inputs[di];
881 if (route.configurator.has_value() == false) {
882 i++;
883 continue;
884 }
885 ExpirationHandler handler{
886 .name = route.configurator->name,
887 .routeIndex = RouteIndex{i++},
888 .lifetime = route.matcher.lifetime,
889 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
890 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
891 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
892 context.expirationHandlers.emplace_back(std::move(handler));
893 }
894
895 if (state.awakeMainThread == nullptr) {
896 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
897 state.awakeMainThread->data = &state;
898 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
899 }
900
901 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
902 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
903 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
904
905 for (auto& channel : GetChannels()) {
906 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
907 &registry = mServiceRegistry,
908 &pendingRegionInfos = mPendingRegionInfos,
909 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
910 std::lock_guard<std::mutex> lock(regionInfoMutex);
911 LOG(detail) << ">>> Region info event" << info.event;
912 LOG(detail) << "id: " << info.id;
913 LOG(detail) << "ptr: " << info.ptr;
914 LOG(detail) << "size: " << info.size;
915 LOG(detail) << "flags: " << info.flags;
916 // Now we check for pending events with the mutex,
917 // so the lines below are atomic.
918 pendingRegionInfos.push_back(info);
919 context.expectedRegionCallbacks -= 1;
920 // We always want to handle these on the main loop,
921 // so we awake it.
922 ServiceRegistryRef ref{registry};
923 uv_async_send(ref.get<DeviceState>().awakeMainThread);
924 });
925 }
926
927 // Add a signal manager for SIGUSR1 so that we can force
928 // an event from the outside, making sure that the event loop can
929 // be unblocked (e.g. by a quitting DPL driver) even when there
930 // is no data pending to be processed.
931 if (deviceContext.sigusr1Handle == nullptr) {
932 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
933 deviceContext.sigusr1Handle->data = &mServiceRegistry;
934 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
935 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
936 }
937 // If there is any signal, we want to make sure they are active
938 for (auto& handle : state.activeSignals) {
939 handle->data = &state;
940 }
941 // When we start, we must make sure that we do listen to the signal
942 deviceContext.sigusr1Handle->data = &mServiceRegistry;
943
945 DataProcessingDevice::initPollers();
946
947 // Whenever we InitTask, we consider as if the previous iteration
948 // was successful, so that even if there is no timer or receiving
949 // channel, we can still start an enumeration.
950 DataProcessorContext* initialContext = nullptr;
951 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
952 if (!idle) {
953 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
954 }
955
956 // We should be ready to run here. Therefore we copy all the
957 // required parts in the DataProcessorContext. Eventually we should
958 // do so on a per thread basis, with fine grained locks.
959 // FIXME: this should not use ServiceRegistry::threadSalt, but
960 // more a ServiceRegistry::globalDataProcessorSalt(N) where
961 // N is the number of the multiplexed data processor.
962 // We will get there.
963 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
964
965 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
966
967 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
968 std::lock_guard<std::mutex> lock(mutex);
969 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
970 };
971 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
976 while (hasPendingEvents(deviceContext)) {
977 // Wait for the callback to signal its done, so that we do not busy wait.
978 uv_run(state.loop, UV_RUN_ONCE);
979 // Handle callbacks if any
980 {
981 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
982 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
983 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
984 }
985 }
986 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
987}
988
990{
991 context.isSink = false;
992 // If nothing is a sink, the rate limiting simply does not trigger.
993 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
994
995 auto ref = ServiceRegistryRef{mServiceRegistry};
996 auto& spec = ref.get<DeviceSpec const>();
997
998 // The policy is now allowed to state the default.
999 context.balancingInputs = spec.completionPolicy.balanceChannels;
1000 // This is needed because the internal injected dummy sink should not
1001 // try to balance inputs unless the rate limiting is requested.
1002 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1003 context.balancingInputs = false;
1004 }
1005 if (enableRateLimiting) {
1006 for (auto& spec : spec.outputs) {
1007 if (spec.matcher.binding.value == "dpl-summary") {
1008 context.isSink = true;
1009 break;
1010 }
1011 }
1012 }
1013
1014 context.registry = &mServiceRegistry;
1017 if (context.error != nullptr) {
1018 context.errorHandling = [&errorCallback = context.error,
1019 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1023 auto& err = error_from_ref(e);
1024 auto& context = ref.get<DataProcessorContext>();
1025 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1026 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1027 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1028 auto& stats = ref.get<DataProcessingStats>();
1030 ErrorContext errorContext{record, ref, e};
1031 errorCallback(errorContext);
1032 };
1033 } else {
1034 context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1035 auto& err = error_from_ref(e);
1039 auto& context = ref.get<DataProcessorContext>();
1040 auto& deviceContext = ref.get<DeviceContext>();
1041 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1042 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1043 auto& stats = ref.get<DataProcessingStats>();
1045 switch (deviceContext.processingPolicies.error) {
1047 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1048 throw e;
1049 default:
1050 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1051 break;
1052 }
1053 };
1054 }
1055
1056 auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> ForwardPolicy {
1057 ForwardPolicy defaultEarlyForwardPolicy = getenv("DPL_OLD_EARLY_FORWARD") ? ForwardPolicy::AtCompletionPolicySatisified : ForwardPolicy::AtInjection;
1058 // FIXME: try again with the new policy by default.
1059 //
1060 // Make the new policy optional until we handle some of the corner cases
1061 // with custom policies which expect the early forward to happen only when
1062 // all the data is available, like in the TPC case.
1063 // ForwardPolicy defaultEarlyForwardPolicy = getenv("DPL_NEW_EARLY_FORWARD") ? ForwardPolicy::AtInjection : ForwardPolicy::AtCompletionPolicySatisified;
1064 for (auto& forward : spec.forwards) {
1065 if (DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}) ||
1066 DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}) ||
1067 DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "DIGITS"}) ||
1068 DataSpecUtils::match(forward.matcher, ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CLUSTERNATIVE"})) {
1069 defaultEarlyForwardPolicy = ForwardPolicy::AtCompletionPolicySatisified;
1070 break;
1071 }
1072 }
1073
1076 ForwardPolicy forwardPolicy = defaultEarlyForwardPolicy;
1077 if (spec.forwards.empty() == false) {
1078 switch (deviceContext.processingPolicies.earlyForward) {
1080 forwardPolicy = ForwardPolicy::AfterProcessing;
1081 break;
1083 forwardPolicy = defaultEarlyForwardPolicy;
1084 break;
1086 forwardPolicy = defaultEarlyForwardPolicy;
1087 break;
1088 }
1089 }
1090 bool onlyConditions = true;
1091 bool overriddenEarlyForward = false;
1092 for (auto& forwarded : spec.forwards) {
1093 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1094 onlyConditions = false;
1095 }
1097 forwardPolicy = ForwardPolicy::AfterProcessing;
1098 overriddenEarlyForward = true;
1099 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1100 break;
1101 }
1102 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1103 forwardPolicy = ForwardPolicy::AfterProcessing;
1104 overriddenEarlyForward = true;
1105 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1106 break;
1107 }
1108 }
1109 if (!overriddenEarlyForward && onlyConditions) {
1110 forwardPolicy = defaultEarlyForwardPolicy;
1111 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1112 }
1113 return forwardPolicy;
1114 };
1115 context.forwardPolicy = decideEarlyForward();
1116}
1117
1119{
1120 auto ref = ServiceRegistryRef{mServiceRegistry};
1121 auto& state = ref.get<DeviceState>();
1122
1123 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1124 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1125 state.quitRequested = false;
1127 state.allowedProcessing = DeviceState::Any;
1128 for (auto& info : state.inputChannelInfos) {
1129 if (info.state != InputChannelState::Pull) {
1130 info.state = InputChannelState::Running;
1131 }
1132 }
1133
1134 // Catch callbacks which fail before we start.
1135 // Notice that when running multiple dataprocessors
1136 // we should probably allow expendable ones to fail.
1137 try {
1138 auto& dpContext = ref.get<DataProcessorContext>();
1139 dpContext.preStartCallbacks(ref);
1140 for (size_t i = 0; i < mStreams.size(); ++i) {
1141 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1142 auto& context = streamRef.get<StreamContext>();
1143 context.preStartStreamCallbacks(streamRef);
1144 }
1145 } catch (std::exception& e) {
1146 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1147 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1148 throw;
1149 } catch (o2::framework::RuntimeErrorRef& e) {
1150 auto& err = error_from_ref(e);
1151 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1152 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1153 throw;
1154 } catch (...) {
1155 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1156 throw;
1157 }
1158
1159 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1160 startPollers();
1161
1162 // Raise to 1 when we are ready to start processing
1163 using o2::monitoring::Metric;
1164 using o2::monitoring::Monitoring;
1165 using o2::monitoring::tags::Key;
1166 using o2::monitoring::tags::Value;
1167
1168 auto& monitoring = ref.get<Monitoring>();
1169 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1170 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1171}
1172
1174{
1175 ServiceRegistryRef ref{mServiceRegistry};
1176 // Raise to 1 when we are ready to start processing
1177 using o2::monitoring::Metric;
1178 using o2::monitoring::Monitoring;
1179 using o2::monitoring::tags::Key;
1180 using o2::monitoring::tags::Value;
1181
1182 auto& monitoring = ref.get<Monitoring>();
1183 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1184
1185 stopPollers();
1186 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1187 auto& dpContext = ref.get<DataProcessorContext>();
1188 dpContext.postStopCallbacks(ref);
1189}
1190
1192{
1193 ServiceRegistryRef ref{mServiceRegistry};
1194 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1195}
1196
1198{
1199 ServiceRegistryRef ref{mServiceRegistry};
1200 auto& state = ref.get<DeviceState>();
1202 bool firstLoop = true;
1203 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1204 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1205
1206 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1207 if (dplEnableMultithreding) {
1208 setenv("UV_THREADPOOL_SIZE", "1", 1);
1209 }
1210
1211 while (state.transitionHandling != TransitionHandlingState::Expired) {
1212 if (state.nextFairMQState.empty() == false) {
1213 (void)this->ChangeState(state.nextFairMQState.back());
1214 state.nextFairMQState.pop_back();
1215 }
1216 // Notify on the main thread the new region callbacks, making sure
1217 // no callback is issued if there is something still processing.
1218 {
1219 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1220 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1221 }
1222 // This will block for the correct delay (or until we get data
1223 // on a socket). We also do not block on the first iteration
1224 // so that devices which do not have a timer can still start an
1225 // enumeration.
1226 {
1227 ServiceRegistryRef ref{mServiceRegistry};
1228 ref.get<DriverClient>().flushPending(mServiceRegistry);
1229 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1230 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1231 // In such case we will take care of it at next iteration.
1232 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1233
1234 auto shouldNotWait = (lastActive != nullptr &&
1235 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1237 if (firstLoop) {
1238 shouldNotWait = true;
1239 firstLoop = false;
1240 }
1241 if (lastActive != nullptr) {
1243 }
1244 if (NewStatePending()) {
1245 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1246 shouldNotWait = true;
1248 }
1250 // If we are Idle, we can then consider the transition to be expired.
1251 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1252 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1253 state.transitionHandling = TransitionHandlingState::Expired;
1254 }
1255 if (state.severityStack.empty() == false) {
1256 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1257 state.severityStack.pop_back();
1258 }
1259 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1260 // shouldNotWait |= info.readPolled;
1261 // }
1262 state.loopReason = DeviceState::NO_REASON;
1263 state.firedTimers.clear();
1264 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1265 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1266 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1267 }
1268 // Run the asynchronous queue just before sleeping again, so that:
1269 // - we can trigger further events from the queue
1270 // - we can guarantee this is the last thing we do in the loop (
1271 // assuming no one else is adding to the queue before this point).
1272 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1273 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1274 ServiceRegistryRef ref{registry};
1275 ref.get<AsyncQueue>();
1276 ref.get<DecongestionService>();
1277 ref.get<DataRelayer>();
1278 // Get the current timeslice for the slot.
1279 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1281 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1282 };
1283 auto& relayer = ref.get<DataRelayer>();
1284 relayer.prunePending(onDrop);
1285 auto& queue = ref.get<AsyncQueue>();
1286 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1287 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1288 if (shouldNotWait == false) {
1289 auto& dpContext = ref.get<DataProcessorContext>();
1290 dpContext.preLoopCallbacks(ref);
1291 }
1292 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1293 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1294 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1295 if ((state.loopReason & state.tracingFlags) != 0) {
1296 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1297 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1298 } else if (state.severityStack.empty() == false) {
1299 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1300 state.severityStack.pop_back();
1301 }
1302 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1303
1304 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1305 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1306 relayer.rescan();
1307 }
1308
1309 if (!state.pendingOffers.empty()) {
1310 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1311 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1312 }
1313 }
1314
1315 // Notify on the main thread the new region callbacks, making sure
1316 // no callback is issued if there is something still processing.
1317 // Notice that we still need to perform callbacks also after
1318 // the socket epolled, because otherwise we would end up serving
1319 // the callback after the first data arrives is the system is too
1320 // fast to transition from Init to Run.
1321 {
1322 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1323 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1324 }
1325
1326 assert(mStreams.size() == mHandles.size());
1328 TaskStreamRef streamRef{-1};
1329 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1330 auto& taskInfo = mStreams[ti];
1331 if (taskInfo.running) {
1332 continue;
1333 }
1334 // Stream 0 is for when we run in
1335 streamRef.index = ti;
1336 }
1337 using o2::monitoring::Metric;
1338 using o2::monitoring::Monitoring;
1339 using o2::monitoring::tags::Key;
1340 using o2::monitoring::tags::Value;
1341 // We have an empty stream, let's check if we have enough
1342 // resources for it to run something
1343 if (streamRef.index != -1) {
1344 // Synchronous execution of the callbacks. This will be moved in the
1345 // moved in the on_socket_polled once we have threading in place.
1346 uv_work_t& handle = mHandles[streamRef.index];
1347 TaskStreamInfo& stream = mStreams[streamRef.index];
1348 handle.data = &mStreams[streamRef.index];
1349
1350 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1351 ServiceRegistryRef ref{registry};
1352 auto& dpStats = ref.get<DataProcessingStats>();
1353 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1354 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1355 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
1356 dpStats.processCommandQueue();
1357 };
1358 auto ref = ServiceRegistryRef{mServiceRegistry};
1359
1360 // Deciding wether to run or not can be done by passing a request to
1361 // the evaluator. In this case, the request is always satisfied and
1362 // we run on whatever resource is available.
1363 auto& spec = ref.get<DeviceSpec const>();
1364 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1365
1366 struct SchedulingStats {
1367 std::atomic<size_t> lastScheduled = 0;
1368 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1369 std::atomic<size_t> numberOfUnscheduled = 0;
1370 std::atomic<size_t> numberOfScheduled = 0;
1371 };
1372 static SchedulingStats schedulingStats;
1373 O2_SIGNPOST_ID_GENERATE(sid, scheduling);
1374 if (enough) {
1375 stream.id = streamRef;
1376 stream.running = true;
1377 stream.registry = &mServiceRegistry;
1378 schedulingStats.lastScheduled = uv_now(state.loop);
1379 schedulingStats.numberOfScheduled++;
1380 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1381 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
1382 if (dplEnableMultithreding) [[unlikely]] {
1383 stream.task = &handle;
1384 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1385 } else {
1386 run_callback(&handle);
1387 run_completion(&handle, 0);
1388 }
1389 } else {
1390 if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1391 (uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) {
1392 O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1393 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1394 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1395 schedulingStats.lastScheduled.load());
1396 } else {
1397 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1398 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1399 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1400 schedulingStats.lastScheduled.load());
1401 }
1402 schedulingStats.numberOfUnscheduled++;
1403 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1404 auto ref = ServiceRegistryRef{mServiceRegistry};
1405 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1406 }
1407 }
1408 }
1409
1410 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
1411 auto& spec = ref.get<DeviceSpec const>();
1413 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1414 auto& info = state.inputChannelInfos[ci];
1415 info.parts.fParts.clear();
1416 }
1417 state.transitionHandling = TransitionHandlingState::NoTransition;
1418}
1419
1423{
1424 auto& context = ref.get<DataProcessorContext>();
1425 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1426 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1427
1428 {
1429 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1430 }
1431 // Whether or not we had something to do.
1432
1433 // Initialise the value for context.allDone. It will possibly be updated
1434 // below if any of the channels is not done.
1435 //
1436 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1437 // expect to receive an EndOfStream signal. Thus we do not wait for these
1438 // to be completed. In the case of data source devices, as they do not have
1439 // real data input channels, they have to signal EndOfStream themselves.
1440 auto& state = ref.get<DeviceState>();
1441 auto& spec = ref.get<DeviceSpec const>();
1442 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1443 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1444 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1445 if (info.channel) {
1446 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1447 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1448 } else {
1449 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1450 }
1451 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1452 });
1453 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1454 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1457 static std::vector<int> pollOrder;
1458 pollOrder.resize(state.inputChannelInfos.size());
1459 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1460 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1461 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1462 });
1463
1464 // Nothing to poll...
1465 if (pollOrder.empty()) {
1466 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1467 return;
1468 }
1469 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1470 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1471 auto delta = currentNewest.value - currentOldest.value;
1472 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1473 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1474 auto& infos = state.inputChannelInfos;
1475
1476 if (context.balancingInputs) {
1477 static int pipelineLength = DefaultsHelpers::pipelineLength();
1478 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1479 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1480 return infos[a].oldestForChannel.value > limitNew;
1481 });
1482 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1483 const auto& channelInfo = state.inputChannelInfos[*it];
1484 if (channelInfo.pollerIndex != -1) {
1485 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1486 auto& pollerContext = *(PollerContext*)(poller->data);
1487 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1488 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1489 bool shouldBeRunning = it < newEnd;
1490 if (running != shouldBeRunning) {
1491 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1492 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1493 }
1494 }
1495 }
1496 }
1497 pollOrder.erase(newEnd, pollOrder.end());
1498 }
1499 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1500
1501 for (auto sci : pollOrder) {
1502 auto& info = state.inputChannelInfos[sci];
1503 auto& channelSpec = spec.inputChannels[sci];
1504 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1505 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1506
1507 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1508 context.allDone = false;
1509 }
1510 if (info.state != InputChannelState::Running) {
1511 // Remember to flush data if we are not running
1512 // and there is some message pending.
1513 if (info.parts.Size()) {
1515 }
1516 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1517 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1518 continue;
1519 }
1520 if (info.channel == nullptr) {
1521 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1522 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1523 continue;
1524 }
1525 // Only poll DPL channels for now.
1527 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1528 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1529 continue;
1530 }
1531 auto& socket = info.channel->GetSocket();
1532 // If we have pending events from a previous iteration,
1533 // we do receive in any case.
1534 // Otherwise we check if there is any pending event and skip
1535 // this channel in case there is none.
1536 if (info.hasPendingEvents == 0) {
1537 socket.Events(&info.hasPendingEvents);
1538 // If we do not read, we can continue.
1539 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1540 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1541 continue;
1542 }
1543 }
1544 // We can reset this, because it means we have seen at least 1
1545 // message after the UV_READABLE was raised.
1546 info.readPolled = false;
1547 // Notice that there seems to be a difference between the documentation
1548 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1549 // is raised does not mean that a message is immediately available to
1550 // read, just that it will be available soon, so the receive can
1551 // still return -2. To avoid this we keep receiving on the socket until
1552 // we get a message. In order not to overflow the DPL queue we process
1553 // one message at the time and we keep track of wether there were more
1554 // to process.
1555 bool newMessages = false;
1556 while (true) {
1557 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1558 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1559 if (info.parts.Size() < 64) {
1560 fair::mq::Parts parts;
1561 info.channel->Receive(parts, 0);
1562 if (parts.Size()) {
1563 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1564 }
1565 for (auto&& part : parts) {
1566 info.parts.fParts.emplace_back(std::move(part));
1567 }
1568 newMessages |= true;
1569 }
1570
1571 if (info.parts.Size() >= 0) {
1573 // Receiving data counts as activity now, so that
1574 // We can make sure we process all the pending
1575 // messages without hanging on the uv_run.
1576 break;
1577 }
1578 }
1579 // We check once again for pending events, keeping track if this was the
1580 // case so that we can immediately repeat this loop and avoid remaining
1581 // stuck in uv_run. This is because we will not get notified on the socket
1582 // if more events are pending due to zeromq level triggered approach.
1583 socket.Events(&info.hasPendingEvents);
1584 if (info.hasPendingEvents) {
1585 info.readPolled = false;
1586 // In case there were messages, we consider it as activity
1587 if (newMessages) {
1588 state.lastActiveDataProcessor.store(&context);
1589 }
1590 }
1591 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1592 channelSpec.name.c_str(), info.id.value);
1593 }
1594}
1595
1597{
1598 auto& context = ref.get<DataProcessorContext>();
1599 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1600 auto& state = ref.get<DeviceState>();
1601 auto& spec = ref.get<DeviceSpec const>();
1602
1603 if (state.streaming == StreamingState::Idle) {
1604 return;
1605 }
1606
1607 context.completed.clear();
1608 context.completed.reserve(16);
1609 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1610 state.lastActiveDataProcessor.store(&context);
1611 }
1612 DanglingContext danglingContext{*context.registry};
1613
1614 context.preDanglingCallbacks(danglingContext);
1615 if (state.lastActiveDataProcessor.load() == nullptr) {
1616 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1617 }
1618 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1619 if (activity.expiredSlots > 0) {
1620 state.lastActiveDataProcessor = &context;
1621 }
1622
1623 context.completed.clear();
1624 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1625 state.lastActiveDataProcessor = &context;
1626 }
1627
1628 context.postDanglingCallbacks(danglingContext);
1629
1630 // If we got notified that all the sources are done, we call the EndOfStream
1631 // callback and return false. Notice that what happens next is actually
1632 // dependent on the callback, not something which is controlled by the
1633 // framework itself.
1634 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1636 state.lastActiveDataProcessor = &context;
1637 }
1638
1639 if (state.streaming == StreamingState::EndOfStreaming) {
1640 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1641 // We keep processing data until we are Idle.
1642 // FIXME: not sure this is the correct way to drain the queues, but
1643 // I guess we will see.
1646 auto& relayer = ref.get<DataRelayer>();
1647
1648 bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
1649
1650 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1651 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1652 }
1653
1654 auto& timingInfo = ref.get<TimingInfo>();
1655 // We should keep the data generated at end of stream only for those
1656 // which are not sources.
1657 timingInfo.keepAtEndOfStream = shouldProcess;
1658 // Fill timinginfo with some reasonable values for data sent with endOfStream
1659 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1660 timingInfo.tfCounter = -1;
1661 timingInfo.firstTForbit = -1;
1662 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1663 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1664 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1665
1666 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1667
1668 context.preEOSCallbacks(eosContext);
1669 auto& streamContext = ref.get<StreamContext>();
1670 streamContext.preEOSCallbacks(eosContext);
1671 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1672 streamContext.postEOSCallbacks(eosContext);
1673 context.postEOSCallbacks(eosContext);
1674
1675 for (auto& channel : spec.outputChannels) {
1676 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1678 }
1679 // This is needed because the transport is deleted before the device.
1680 relayer.clear();
1682 // In case we should process, note the data processor responsible for it
1683 if (shouldProcess) {
1684 state.lastActiveDataProcessor = &context;
1685 }
1686 // On end of stream we shut down all output pollers.
1687 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1688 for (auto& poller : state.activeOutputPollers) {
1689 uv_poll_stop(poller);
1690 }
1691 return;
1692 }
1693
1694 if (state.streaming == StreamingState::Idle) {
1695 // On end of stream we shut down all output pollers.
1696 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1697 for (auto& poller : state.activeOutputPollers) {
1698 uv_poll_stop(poller);
1699 }
1700 }
1701
1702 return;
1703}
1704
1706{
1707 ServiceRegistryRef ref{mServiceRegistry};
1708 ref.get<DataRelayer>().clear();
1709 auto& deviceContext = ref.get<DeviceContext>();
1710 // If the signal handler is there, we should
1711 // hide the registry from it, so that we do not
1712 // end up calling the signal handler on something
1713 // which might not be there anymore.
1714 if (deviceContext.sigusr1Handle) {
1715 deviceContext.sigusr1Handle->data = nullptr;
1716 }
1717 // Makes sure we do not have a working context on
1718 // shutdown.
1719 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1720 handle->data = nullptr;
1721 }
1722}
1723
1726 {
1727 }
1728};
1729
1730auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>& messages) -> void
1731{
1732 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
1733
1734 auto& spec = ref.get<DeviceSpec const>();
1735 auto& context = ref.get<DataProcessorContext>();
1736 if (context.forwardPolicy == ForwardPolicy::AfterProcessing || spec.forwards.empty()) {
1737 O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding not enabled / needed.");
1738 return;
1739 }
1740
1741 O2_SIGNPOST_EVENT_EMIT(device, sid, "device", "Early forwardinding before injecting data into relayer.");
1742 auto& timesliceIndex = ref.get<TimesliceIndex>();
1743 auto oldestTimeslice = timesliceIndex.getOldestPossibleOutput();
1744
1745 auto& proxy = ref.get<FairMQDeviceProxy>();
1746
1747 O2_SIGNPOST_START(forwarding, sid, "forwardInputs",
1748 "Starting forwarding for incoming messages with oldestTimeslice %zu with copy",
1749 oldestTimeslice.timeslice.value);
1750 std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());
1751 DataProcessingHelpers::routeForwardedMessages(proxy, messages, forwardedParts, true, false);
1752
1753 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
1754 if (forwardedParts[fi].Size() == 0) {
1755 continue;
1756 }
1757 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
1758 auto& parts = forwardedParts[fi];
1759 if (info.policy == nullptr) {
1760 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
1761 continue;
1762 }
1763 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
1764 info.policy->forward(parts, ChannelIndex{fi}, ref);
1765 }
1766 auto& asyncQueue = ref.get<AsyncQueue>();
1767 auto& decongestion = ref.get<DecongestionService>();
1768 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
1769 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
1770 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
1771 .user<DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
1772 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
1773};
1774
1780{
1781 using InputInfo = DataRelayer::InputInfo;
1783
1784 auto& context = ref.get<DataProcessorContext>();
1785 // This is the same id as the upper level function, so we get the events
1786 // associated with the same interval. We will simply use "handle_data" as
1787 // the category.
1788 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1789
1790 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1791 // and we do a few stats. We bind parts as a lambda captured variable, rather
1792 // than an input, because we do not want the outer loop actually be exposed
1793 // to the implementation details of the messaging layer.
1794 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1795 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1796 auto ref = ServiceRegistryRef{*context.registry};
1797 auto& stats = ref.get<DataProcessingStats>();
1798 auto& state = ref.get<DeviceState>();
1799 auto& parts = info.parts;
1800 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1801
1802 std::vector<InputInfo> results;
1803 // we can reserve the upper limit
1804 results.reserve(parts.Size() / 2);
1805 size_t nTotalPayloads = 0;
1806
1807 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1808 results.emplace_back(position, length, type, index);
1809 if (type != InputType::Invalid && length > 1) {
1810 nTotalPayloads += length - 1;
1811 }
1812 };
1813
1814 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1815 auto* headerData = parts.At(pi)->GetData();
1816 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1817 auto dh = o2::header::get<DataHeader*>(headerData);
1818 if (sih) {
1819 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1820 info.state = sih->state;
1821 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1822 state.lastActiveDataProcessor = &context;
1823 if (dh) {
1824 LOGP(error, "Found data attached to a SourceInfoHeader");
1825 }
1826 continue;
1827 }
1828 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1829 if (dih) {
1830 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1831 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1832 state.lastActiveDataProcessor = &context;
1833 if (dh) {
1834 LOGP(error, "Found data attached to a DomainInfoHeader");
1835 }
1836 continue;
1837 }
1838 if (!dh) {
1839 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1840 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1841 continue;
1842 }
1843 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1844 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1845 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1846 continue;
1847 }
1848 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1849 // We only deal with the tracking of parts if the log is enabled.
1850 // This is because in principle we should track the size of each of
1851 // the parts and sum it up. Not for now.
1852 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1853 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1854 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1855 if (!dph) {
1856 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1857 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1858 continue;
1859 }
1860 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1861 // this is indicating a sequence of payloads following the header
1862 // FIXME: we will probably also set the DataHeader version
1863 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1864 pi += dh->splitPayloadParts - 1;
1865 } else {
1866 // We can set the type for the next splitPayloadParts
1867 // because we are guaranteed they are all the same.
1868 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1869 // pair.
1870 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1871 if (finalSplitPayloadIndex > parts.Size()) {
1872 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1873 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1874 continue;
1875 }
1876 insertInputInfo(pi, 2, InputType::Data, info.id);
1877 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1878 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1879 }
1880 }
1881 }
1882 if (results.size() + nTotalPayloads != parts.Size()) {
1883 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1884 return std::nullopt;
1885 }
1886 return results;
1887 };
1888
1889 auto reportError = [ref](const char* message) {
1890 auto& stats = ref.get<DataProcessingStats>();
1892 };
1893
1894 auto handleValidMessages = [&info, ref, &reportError, &context](std::vector<InputInfo> const& inputInfos) {
1895 auto& relayer = ref.get<DataRelayer>();
1896 auto& state = ref.get<DeviceState>();
1897 static WaitBackpressurePolicy policy;
1898 auto& parts = info.parts;
1899 // We relay execution to make sure we have a complete set of parts
1900 // available.
1901 bool hasBackpressure = false;
1902 size_t minBackpressureTimeslice = -1;
1903 bool hasData = false;
1904 size_t oldestPossibleTimeslice = -1;
1905 static std::vector<int> ordering;
1906 // Same as inputInfos but with iota.
1907 ordering.resize(inputInfos.size());
1908 std::iota(ordering.begin(), ordering.end(), 0);
1909 // stable sort orderings by type and position
1910 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
1911 auto const& ai = inputInfos[a];
1912 auto const& bi = inputInfos[b];
1913 if (ai.type != bi.type) {
1914 return ai.type < bi.type;
1915 }
1916 return ai.position < bi.position;
1917 });
1918 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
1919 auto const& input = inputInfos[ordering[ii]];
1920 switch (input.type) {
1921 case InputType::Data: {
1922 hasData = true;
1923 auto headerIndex = input.position;
1924 auto nMessages = 0;
1925 auto nPayloadsPerHeader = 0;
1926 if (input.size > 2) {
1927 // header and multiple payload sequence
1928 nMessages = input.size;
1929 nPayloadsPerHeader = nMessages - 1;
1930 } else {
1931 // multiple header-payload pairs
1932 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
1933 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
1934 nPayloadsPerHeader = 1;
1935 ii += (nMessages / 2) - 1;
1936 }
1937 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1938 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
1939 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
1940 slot.index, oldestOutputInfo.timeslice.value);
1941 ref.get<AsyncQueue>();
1942 ref.get<DecongestionService>();
1943 ref.get<DataRelayer>();
1944 // Get the current timeslice for the slot.
1945 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1947 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
1948 };
1949
1950 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
1951 &parts.At(headerIndex),
1952 input,
1953 nMessages,
1954 nPayloadsPerHeader,
1955 context.forwardPolicy == ForwardPolicy::AtInjection ? forwardOnInsertion : nullptr,
1956 onDrop);
1957 switch (relayed.type) {
1959 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
1960 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
1961 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1962 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
1963 info.backpressureNotified = true;
1964 info.normalOpsNotified = false;
1965 }
1966 policy.backpressure(info);
1967 hasBackpressure = true;
1968 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
1969 break;
1973 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
1974 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
1975 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1976 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
1977 info.normalOpsNotified = true;
1978 info.backpressureNotified = false;
1979 }
1980 break;
1981 }
1982 } break;
1983 case InputType::SourceInfo: {
1984 LOGP(detail, "Received SourceInfo");
1985 auto& context = ref.get<DataProcessorContext>();
1986 state.lastActiveDataProcessor = &context;
1987 auto headerIndex = input.position;
1988 auto payloadIndex = input.position + 1;
1989 assert(payloadIndex < parts.Size());
1990 // FIXME: the message with the end of stream cannot contain
1991 // split parts.
1992 parts.At(headerIndex).reset(nullptr);
1993 parts.At(payloadIndex).reset(nullptr);
1994 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
1995 // parts.At(headerIndex + 1 + i).reset(nullptr);
1996 // }
1997 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
1998
1999 } break;
2000 case InputType::DomainInfo: {
2003 auto& context = ref.get<DataProcessorContext>();
2004 state.lastActiveDataProcessor = &context;
2005 auto headerIndex = input.position;
2006 auto payloadIndex = input.position + 1;
2007 assert(payloadIndex < parts.Size());
2008 // FIXME: the message with the end of stream cannot contain
2009 // split parts.
2010
2011 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2012 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2013 break;
2014 }
2015 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2016 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2017 parts.At(headerIndex).reset(nullptr);
2018 parts.At(payloadIndex).reset(nullptr);
2019 } break;
2020 case InputType::Invalid: {
2021 reportError("Invalid part found.");
2022 } break;
2023 }
2024 }
2027 if (oldestPossibleTimeslice != (size_t)-1) {
2028 info.oldestForChannel = {oldestPossibleTimeslice};
2029 auto& context = ref.get<DataProcessorContext>();
2030 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2031 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2032 state.lastActiveDataProcessor = &context;
2033 }
2034 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2035 parts.fParts.erase(it, parts.end());
2036 if (parts.fParts.size()) {
2037 LOG(debug) << parts.fParts.size() << " messages backpressured";
2038 }
2039 };
2040
2041 // Second part. This is the actual outer loop we want to obtain, with
2042 // implementation details which can be read. Notice how most of the state
2043 // is actually hidden. For example we do not expose what "input" is. This
2044 // will allow us to keep the same toplevel logic even if the actual meaning
2045 // of input is changed (for example we might move away from multipart
2046 // messages). Notice also that we need to act diffently depending on the
2047 // actual CompletionOp we want to perform. In particular forwarding inputs
2048 // also gets rid of them from the cache.
2049 auto inputTypes = getInputTypes();
2050 if (bool(inputTypes) == false) {
2051 reportError("Parts should come in couples. Dropping it.");
2052 return;
2053 }
2054 handleValidMessages(*inputTypes);
2055 return;
2056}
2057
2058namespace
2059{
2060struct InputLatency {
2061 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2062 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2063};
2064
2065auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2066{
2067 InputLatency result;
2068
2069 for (auto& item : record) {
2070 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2071 if (header == nullptr) {
2072 continue;
2073 }
2074 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2075 if (partLatency < 0) {
2076 partLatency = 0;
2077 }
2078 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2079 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2080 }
2081 return result;
2082};
2083
2084auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2085{
2086 size_t totalInputSize = 0;
2087 for (auto& item : record) {
2088 auto* header = o2::header::get<DataHeader*>(item.header);
2089 if (header == nullptr) {
2090 continue;
2091 }
2092 totalInputSize += header->payloadSize;
2093 }
2094 return totalInputSize;
2095};
2096
2097template <typename T>
2098void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2099{
2100 T prev_value = maximum_value;
2101 while (prev_value < value &&
2102 !maximum_value.compare_exchange_weak(prev_value, value)) {
2103 }
2104}
2105} // namespace
2106
2107bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2108{
2109 auto& context = ref.get<DataProcessorContext>();
2110 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2111 // This is the actual hidden state for the outer loop. In case we decide we
2112 // want to support multithreaded dispatching of operations, I can simply
2113 // move these to some thread local store and the rest of the lambdas
2114 // should work just fine.
2115 std::vector<MessageSet> currentSetOfInputs;
2116
2117 //
2118 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2119 auto& relayer = ref.get<DataRelayer>();
2120 if (consume) {
2121 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2122 } else {
2123 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2124 }
2125 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2126 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2127 const char* headerptr = nullptr;
2128 const char* payloadptr = nullptr;
2129 size_t payloadSize = 0;
2130 // - each input can have multiple parts
2131 // - "part" denotes a sequence of messages belonging together, the first message of the
2132 // sequence is the header message
2133 // - each part has one or more payload messages
2134 // - InputRecord provides all payloads as header-payload pair
2135 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2136 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2137 headerptr = static_cast<char const*>(headerMsg->GetData());
2138 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2139 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2140 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2141 }
2142 return DataRef{};
2143 };
2144 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2145 return currentSetOfInputs[i].getNumberOfPairs();
2146 };
2147 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2148 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2149 return header.GetRefCount();
2150 };
2151 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2152 };
2153
2154 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2155 auto& relayer = ref.get<DataRelayer>();
2157 };
2158
2159 // I need a preparation step which gets the current timeslice id and
2160 // propagates it to the various contextes (i.e. the actual entities which
2161 // create messages) because the messages need to have the timeslice id into
2162 // it.
2163 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2164 auto& relayer = ref.get<DataRelayer>();
2165 auto& timingInfo = ref.get<TimingInfo>();
2166 auto timeslice = relayer.getTimesliceForSlot(i);
2167
2168 timingInfo.timeslice = timeslice.value;
2169 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2170 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2171 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2172 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2173 };
2174 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2175 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2176 auto& relayer = ref.get<DataRelayer>();
2177 auto& timingInfo = ref.get<TimingInfo>();
2178 auto timeslice = relayer.getTimesliceForSlot(i);
2179 // We report wether or not this timing info refers to a new Run.
2180 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2181 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2182 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2183 // FIXME: for now there is only one stream, however we
2184 // should calculate this correctly once we finally get the
2185 // the StreamContext in.
2186 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2187 };
2188
2189 // When processing them, timers will have to be cleaned up
2190 // to avoid double counting them.
2191 // This was actually the easiest solution we could find for
2192 // O2-646.
2193 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2194 assert(record.size() == currentSetOfInputs.size());
2195 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2196 // assuming that for timer inputs we do have exactly one PartRef object
2197 // in the MessageSet, multiple PartRef Objects are only possible for either
2198 // split payload messages of wildcard matchers, both for data inputs
2199 DataRef input = record.getByPos(ii);
2200 if (input.spec->lifetime != Lifetime::Timer) {
2201 continue;
2202 }
2203 if (input.header == nullptr) {
2204 continue;
2205 }
2206 // This will hopefully delete the message.
2207 currentSetOfInputs[ii].clear();
2208 }
2209 };
2210
2211 // Function to cleanup record. For the moment we
2212 // simply use it to keep track of input messages
2213 // which are not needed, to display them in the GUI.
2214 auto cleanupRecord = [](InputRecord& record) {
2215 if (O2_LOG_ENABLED(parts) == false) {
2216 return;
2217 }
2218 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2219 DataRef input = record.getByPos(pi);
2220 if (input.header == nullptr) {
2221 continue;
2222 }
2223 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2224 if (sih) {
2225 continue;
2226 }
2227
2228 auto dh = o2::header::get<DataHeader*>(input.header);
2229 if (!dh) {
2230 continue;
2231 }
2232 // We use the address of the first header of a split payload
2233 // to identify the interval.
2234 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2235 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2236
2237 // No split parts, we simply skip the payload
2238 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2239 // this is indicating a sequence of payloads following the header
2240 // FIXME: we will probably also set the DataHeader version
2241 pi += dh->splitPayloadParts - 1;
2242 } else {
2243 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2244 }
2245 }
2246 };
2247
2248 ref.get<DataRelayer>().getReadyToProcess(completed);
2249 if (completed.empty() == true) {
2250 LOGP(debug, "No computations available for dispatching.");
2251 return false;
2252 }
2253
2254 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2255 auto& stats = ref.get<DataProcessingStats>();
2256 auto& states = ref.get<DataProcessingStates>();
2257 std::atomic_thread_fence(std::memory_order_release);
2258 char relayerSlotState[1024];
2259 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2260 char* buffer = relayerSlotState + written;
2261 for (size_t ai = 0; ai != record.size(); ai++) {
2262 buffer[ai] = record.isValid(ai) ? '3' : '0';
2263 }
2264 buffer[record.size()] = 0;
2265 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2266 .size = (int)(record.size() + buffer - relayerSlotState),
2267 .data = relayerSlotState});
2268 uint64_t tEnd = uv_hrtime();
2269 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2270 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2272 // Sum up the total wall time, in milliseconds.
2274 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2276 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2277 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2278 auto latency = calculateInputRecordLatency(record, tStartMilli);
2279 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2280 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2281 static int count = 0;
2283 count++;
2284 };
2285
2286 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2287 auto& states = ref.get<DataProcessingStates>();
2288 std::atomic_thread_fence(std::memory_order_release);
2289 char relayerSlotState[1024];
2290 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2291 char* buffer = strchr(relayerSlotState, ' ') + 1;
2292 for (size_t ai = 0; ai != record.size(); ai++) {
2293 buffer[ai] = record.isValid(ai) ? '2' : '0';
2294 }
2295 buffer[record.size()] = 0;
2296 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2297 };
2298
2299 // This is the main dispatching loop
2300 auto& state = ref.get<DeviceState>();
2301 auto& spec = ref.get<DeviceSpec const>();
2302
2303 auto& dpContext = ref.get<DataProcessorContext>();
2304 auto& streamContext = ref.get<StreamContext>();
2305 O2_SIGNPOST_ID_GENERATE(sid, device);
2306 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2307
2308 auto& stats = ref.get<DataProcessingStats>();
2309 auto& relayer = ref.get<DataRelayer>();
2310 using namespace o2::framework;
2311 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2312 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2313 switch (spec.completionPolicy.order) {
2315 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2316 break;
2318 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2319 break;
2321 default:
2322 break;
2323 }
2324
2325 for (auto action : completed) {
2326 O2_SIGNPOST_ID_GENERATE(aid, device);
2327 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2329 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2330 continue;
2331 }
2332
2333 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2335 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2339 updateRunInformation(TimesliceSlot{action.slot});
2340 }
2341 InputSpan span = getInputSpan(action.slot, shouldConsume);
2342 auto& spec = ref.get<DeviceSpec const>();
2343 InputRecord record{spec.inputs,
2344 span,
2345 *context.registry};
2346 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2347 {
2348 // Notice this should be thread safe and reentrant
2349 // as it is called from many threads.
2350 streamContext.preProcessingCallbacks(processContext);
2351 dpContext.preProcessingCallbacks(processContext);
2352 }
2354 context.postDispatchingCallbacks(processContext);
2355 if (spec.forwards.empty() == false) {
2356 auto& timesliceIndex = ref.get<TimesliceIndex>();
2357 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2358 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2359 continue;
2360 }
2361 }
2362 // If there is no optional inputs we canForwardEarly
2363 // the messages to that parallel processing can happen.
2364 // In this case we pass true to indicate that we want to
2365 // copy the messages to the subsequent data processor.
2366 bool hasForwards = spec.forwards.empty() == false;
2368
2369 if (context.forwardPolicy == ForwardPolicy::AtCompletionPolicySatisified && hasForwards && consumeSomething) {
2370 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
2371 auto& timesliceIndex = ref.get<TimesliceIndex>();
2372 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2373 } else if (context.forwardPolicy == ForwardPolicy::AtInjection && hasForwards && consumeSomething) {
2374 // We used to do fowarding here, however we now do it much earlier.
2375 // We still need to clean the inputs which were already consumed
2376 // via ConsumeExisting and which still have an header to hold the slot.
2377 // FIXME: do we? This should really happen when we do the forwarding on
2378 // insertion, because otherwise we lose the relevant information on how to
2379 // navigate the set of headers. We could actually rely on the messageset index,
2380 // is that the right thing to do though?
2381 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "cleaning early forwarding: %{public}s.", fmt::format("{}", action.op).c_str());
2382 auto& timesliceIndex = ref.get<TimesliceIndex>();
2383 cleanEarlyForward(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2384 }
2385
2386 markInputsAsDone(action.slot);
2387
2388 uint64_t tStart = uv_hrtime();
2389 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2390 preUpdateStats(action, record, tStart);
2391
2392 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2393
2394 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2395 auto& state = ref.get<DeviceState>();
2396 auto& spec = ref.get<DeviceSpec const>();
2397 auto& streamContext = ref.get<StreamContext>();
2398 auto& dpContext = ref.get<DataProcessorContext>();
2399 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2400 switch (action.op) {
2405 return true;
2406 break;
2407 default:
2408 return false;
2409 }
2410 };
2411 if (state.quitRequested == false) {
2412 {
2413 // Callbacks from services
2414 dpContext.preProcessingCallbacks(processContext);
2415 streamContext.preProcessingCallbacks(processContext);
2416 dpContext.preProcessingCallbacks(processContext);
2417 // Callbacks from users
2418 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2419 }
2420 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2421 if (context.statefulProcess && shouldProcess(action)) {
2422 // This way, usercode can use the the same processing context to identify
2423 // its signposts and we can map user code to device iterations.
2424 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2425 (context.statefulProcess)(processContext);
2426 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2427 } else if (context.statelessProcess && shouldProcess(action)) {
2428 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2429 (context.statelessProcess)(processContext);
2430 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2431 } else if (context.statelessProcess || context.statefulProcess) {
2432 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2433 } else {
2434 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2436 }
2437 if (shouldProcess(action)) {
2438 auto& timingInfo = ref.get<TimingInfo>();
2439 if (timingInfo.globalRunNumberChanged) {
2440 context.lastRunNumberProcessed = timingInfo.runNumber;
2441 }
2442 }
2443
2444 // Notify the sink we just consumed some timeframe data
2445 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2446 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2447 auto& allocator = ref.get<DataAllocator>();
2448 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2449 }
2450
2451 // Extra callback which allows a service to add extra outputs.
2452 // This is needed e.g. to ensure that injected CCDB outputs are added
2453 // before an end of stream.
2454 {
2455 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2456 dpContext.finaliseOutputsCallbacks(processContext);
2457 streamContext.finaliseOutputsCallbacks(processContext);
2458 }
2459
2460 {
2461 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2462 dpContext.postProcessingCallbacks(processContext);
2463 streamContext.postProcessingCallbacks(processContext);
2464 }
2465 }
2466 };
2467
2468 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2469 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2470 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2471 }
2472 if (noCatch) {
2473 try {
2474 runNoCatch(action);
2475 } catch (o2::framework::RuntimeErrorRef e) {
2476 (context.errorHandling)(e, record);
2477 }
2478 } else {
2479 try {
2480 runNoCatch(action);
2481 } catch (std::exception& ex) {
2485 auto e = runtime_error(ex.what());
2486 (context.errorHandling)(e, record);
2487 } catch (o2::framework::RuntimeErrorRef e) {
2488 (context.errorHandling)(e, record);
2489 }
2490 }
2491 if (state.severityStack.empty() == false) {
2492 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2493 state.severityStack.pop_back();
2494 }
2495
2496 postUpdateStats(action, record, tStart, tStartMilli);
2497 // We forward inputs only when we consume them. If we simply Process them,
2498 // we keep them for next message arriving.
2500 cleanupRecord(record);
2501 context.postDispatchingCallbacks(processContext);
2502 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2503 }
2504 if ((context.forwardPolicy == ForwardPolicy::AfterProcessing) && hasForwards && consumeSomething) {
2505 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2506 auto& timesliceIndex = ref.get<TimesliceIndex>();
2507 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2508 }
2509 context.postForwardingCallbacks(processContext);
2511 cleanTimers(action.slot, record);
2512 }
2513 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2514 }
2515 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2516
2517 // We now broadcast the end of stream if it was requested
2518 if (state.streaming == StreamingState::EndOfStreaming) {
2519 LOGP(detail, "Broadcasting end of stream");
2520 for (auto& channel : spec.outputChannels) {
2522 }
2524 }
2525
2526 return true;
2527}
2528
2530{
2531 LOG(error) << msg;
2532 ServiceRegistryRef ref{mServiceRegistry};
2533 auto& stats = ref.get<DataProcessingStats>();
2535}
2536
2537std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2538{
2539
2540 if (registry.active<ConfigurationInterface>()) {
2541 auto& cfg = registry.get<ConfigurationInterface>();
2542 try {
2543 cfg.getRecursive(name);
2544 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2545 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2546 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2547 configStore->preload();
2548 configStore->activate();
2549 return configStore;
2550 } catch (...) {
2551 // No overrides...
2552 }
2553 }
2554 return {nullptr};
2555}
2556
2557} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
std::ostringstream debug
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:600
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
OldestOutputInfo getOldestPossibleOutput() const
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void run_callback(uv_work_t *handle)
auto forwardOnInsertion(ServiceRegistryRef &ref, std::span< fair::mq::MessagePtr > &messages) -> void
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< MessageSet > &currentSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void cleanForwardedMessages(std::span< fair::mq::MessagePtr > &currentSetOfInputs, bool consume)
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > &currentSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
ForwardPolicy forwardPolicy
Wether or not the associated DataProcessor can forward things early.
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:619
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153