Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <atomic>
33#include "Framework/InputSpan.h"
34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
36#endif
37#include "Framework/Signpost.h"
50
51#include "DecongestionService.h"
53#include "DataRelayerHelpers.h"
54#include "Headers/DataHeader.h"
56
57#include <Framework/Tracing.h>
58
59#include <fairmq/Parts.h>
60#include <fairmq/Socket.h>
61#include <fairmq/ProgOptions.h>
62#include <fairmq/shmem/Message.h>
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// A log to use for general device logging
91// Special log to keep track of the lifetime of the parts
93// Stream which keeps track of the calibration lifetime logic
95// Special log to track the async queue behavior
97// Special log to track the forwarding requests
99// Special log to track CCDB related requests
101// Special log to track task scheduling
103
104using namespace o2::framework;
105using ConfigurationInterface = o2::configuration::ConfigurationInterface;
107
108constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
109
110namespace o2::framework
111{
112
113template <>
117
121{
122 auto* state = (DeviceState*)handle->data;
123 state->loopReason |= DeviceState::TIMER_EXPIRED;
124}
125
127{
128 auto* state = (DeviceState*)s->data;
130}
131
132DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
133{
134 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
135 return devices[running.index];
136}
137
143
145 : mRunningDevice{running},
146 mConfigRegistry{nullptr},
147 mServiceRegistry{registry}
148{
149 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
150 if (key == "cleanup") {
152 auto& deviceState = ref.get<DeviceState>();
153 int64_t cleanupCount = deviceState.cleanupCount.load();
154 int64_t newCleanupCount = std::stoll(value);
155 if (newCleanupCount <= cleanupCount) {
156 return;
157 }
158 deviceState.cleanupCount.store(newCleanupCount);
159 for (auto& info : deviceState.inputChannelInfos) {
160 fair::mq::Parts parts;
161 while (info.channel->Receive(parts, 0)) {
162 LOGP(debug, "Dropping {} parts", parts.Size());
163 if (parts.Size() == 0) {
164 break;
165 }
166 }
167 }
168 }
169 });
170
171 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
173 auto& deviceState = ref.get<DeviceState>();
174 auto& control = ref.get<ControlService>();
175 auto& callbacks = ref.get<CallbackService>();
176 control.notifyDeviceState(fair::mq::GetStateName(state));
178
179 if (deviceState.nextFairMQState.empty() == false) {
180 auto state = deviceState.nextFairMQState.back();
181 (void)this->ChangeState(state);
182 deviceState.nextFairMQState.pop_back();
183 }
184 };
185
186 // 99 is to execute DPL callbacks last
187 this->SubscribeToStateChange("99-dpl", stateWatcher);
188
189 // One task for now.
190 mStreams.resize(1);
191 mHandles.resize(1);
192
193 ServiceRegistryRef ref{mServiceRegistry};
194
195 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
196 auto& state = ref.get<DeviceState>();
197 assert(state.loop);
198 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
199 mAwakeHandle->data = &state;
200 if (res < 0) {
201 LOG(error) << "Unable to initialise subscription";
202 }
203
205 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
206 int res = uv_async_send(wakeHandle);
207 if (res < 0) {
208 LOG(error) << "Unable to notify subscription";
209 }
210 LOG(debug) << "State transition requested";
211 });
212}
213
214// Callback to execute the processing. Notice how the data is
215// is a vector of DataProcessorContext so that we can index the correct
216// one with the thread id. For the moment we simply use the first one.
217void run_callback(uv_work_t* handle)
218{
219 auto* task = (TaskStreamInfo*)handle->data;
220 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
221 // We create a new signpost interval for this specific data processor. Same id, same data processor.
222 auto& dataProcessorContext = ref.get<DataProcessorContext>();
223 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
224 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
227 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
228}
229
230// Once the processing in a thread is done, this is executed on the main thread.
231void run_completion(uv_work_t* handle, int status)
232{
233 auto* task = (TaskStreamInfo*)handle->data;
234 // Notice that the completion, while running on the main thread, still
235 // has a salt which is associated to the actual stream which was doing the computation
236 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
237 auto& state = ref.get<DeviceState>();
238 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
239
240 using o2::monitoring::Metric;
241 using o2::monitoring::Monitoring;
242 using o2::monitoring::tags::Key;
243 using o2::monitoring::tags::Value;
244
245 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
246 auto& dpStats = ref.get<DataProcessingStats>();
247 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
248 // For now we give back the offer if we did not use it completely.
249 // In principle we should try to run until the offer is fully consumed.
250 stats.totalConsumedTimeslices += std::min<int64_t>(accumulatedConsumed.timeslices, 1);
251
252 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
253 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedTimeslices});
254 dpStats.processCommandQueue();
255 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
256 assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]);
257 };
258
259 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
260 auto& dpStats = ref.get<DataProcessingStats>();
261 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
262 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
263 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
264 dpStats.processCommandQueue();
265 };
266
267 for (auto& consumer : state.offerConsumers) {
268 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
269 }
270 state.offerConsumers.clear();
271 quotaEvaluator.handleExpired(reportExpiredOffer);
272 quotaEvaluator.dispose(task->id.index);
273 task->running = false;
274}
275
276// Context for polling
278 enum struct PollerState : char { Stopped,
280 Connected,
281 Suspended };
282 char const* name = nullptr;
283 uv_loop_t* loop = nullptr;
285 DeviceState* state = nullptr;
286 fair::mq::Socket* socket = nullptr;
288 int fd = -1;
289 bool read = true;
291};
292
293void on_socket_polled(uv_poll_t* poller, int status, int events)
294{
295 auto* context = (PollerContext*)poller->data;
296 assert(context);
297 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
298 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
299 switch (events) {
300 case UV_READABLE: {
301 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
302 context->state->loopReason |= DeviceState::DATA_INCOMING;
303 } break;
304 case UV_WRITABLE: {
305 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
306 if (context->read) {
307 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
308 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
309 context->state->loopReason |= DeviceState::DATA_CONNECTED;
310 } else {
311 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
312 context->state->loopReason |= DeviceState::DATA_OUTGOING;
313 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
314 // just wait for the disconnect.
315 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
316 }
317 context->pollerState = PollerContext::PollerState::Connected;
318 } break;
319 case UV_DISCONNECT: {
320 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
321 } break;
322 case UV_PRIORITIZED: {
323 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
324 } break;
325 }
326 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
327}
328
329void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
330{
331 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
332 auto* context = (PollerContext*)poller->data;
333 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
334 if (status < 0) {
335 LOGP(fatal, "Error while polling {}: {}", context->name, status);
336 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
337 }
338 switch (events) {
339 case UV_READABLE: {
340 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
341 context->state->loopReason |= DeviceState::DATA_INCOMING;
342 assert(context->channelInfo);
343 context->channelInfo->readPolled = true;
344 } break;
345 case UV_WRITABLE: {
346 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
347 if (context->read) {
348 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
349 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
350 } else {
351 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
352 context->state->loopReason |= DeviceState::DATA_OUTGOING;
353 }
354 } break;
355 case UV_DISCONNECT: {
356 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
357 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
358 } break;
359 case UV_PRIORITIZED: {
360 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
361 } break;
362 }
363 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
364}
365
374{
375 auto ref = ServiceRegistryRef{mServiceRegistry};
376 auto& context = ref.get<DataProcessorContext>();
377 auto& spec = getRunningDevice(mRunningDevice, ref);
378
379 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
380 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
381 context.statelessProcess = spec.algorithm.onProcess;
382 context.statefulProcess = nullptr;
383 context.error = spec.algorithm.onError;
384 context.initError = spec.algorithm.onInitError;
385
386 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
387 if (configStore == nullptr) {
388 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
389 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
390 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
391 configStore->preload();
392 configStore->activate();
393 }
394
395 using boost::property_tree::ptree;
396
398 for (auto& entry : configStore->store()) {
399 std::stringstream ss;
400 std::string str;
401 if (entry.second.empty() == false) {
402 boost::property_tree::json_parser::write_json(ss, entry.second, false);
403 str = ss.str();
404 str.pop_back(); // remove EoL
405 } else {
406 str = entry.second.get_value<std::string>();
407 }
408 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
409 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
410 }
411
412 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
413
414 // Setup the error handlers for init
415 if (context.initError) {
416 context.initErrorHandling = [&errorCallback = context.initError,
417 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
421 auto& context = ref.get<DataProcessorContext>();
422 auto& err = error_from_ref(e);
423 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
424 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
425 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
426 auto& stats = ref.get<DataProcessingStats>();
428 InitErrorContext errorContext{ref, e};
429 errorCallback(errorContext);
430 };
431 } else {
432 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
433 auto& err = error_from_ref(e);
437 auto& context = ref.get<DataProcessorContext>();
438 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
439 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
440 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
441 auto& stats = ref.get<DataProcessingStats>();
443 exit(1);
444 };
445 }
446
447 context.expirationHandlers.clear();
448 context.init = spec.algorithm.onInit;
449 if (context.init) {
450 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
451 InitContext initContext{*mConfigRegistry, mServiceRegistry};
452
453 if (noCatch) {
454 try {
455 context.statefulProcess = context.init(initContext);
457 if (context.initErrorHandling) {
458 (context.initErrorHandling)(e);
459 }
460 }
461 } else {
462 try {
463 context.statefulProcess = context.init(initContext);
464 } catch (std::exception& ex) {
468 auto e = runtime_error(ex.what());
469 (context.initErrorHandling)(e);
471 (context.initErrorHandling)(e);
472 }
473 }
474 }
475 auto& state = ref.get<DeviceState>();
476 state.inputChannelInfos.resize(spec.inputChannels.size());
480 int validChannelId = 0;
481 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
482 auto& name = spec.inputChannels[ci].name;
483 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
484 state.inputChannelInfos[ci].state = InputChannelState::Pull;
485 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
486 validChannelId++;
487 } else {
488 state.inputChannelInfos[ci].id = {validChannelId++};
489 }
490 }
491
492 // Invoke the callback policy for this device.
493 if (spec.callbacksPolicy.policy != nullptr) {
494 InitContext initContext{*mConfigRegistry, mServiceRegistry};
495 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
496 }
497
498 // Services which are stream should be initialised now
499 auto* options = GetConfig();
500 for (size_t si = 0; si < mStreams.size(); ++si) {
502 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
503 }
504 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
505}
506
507void on_signal_callback(uv_signal_t* handle, int signum)
508{
509 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
510 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
511
512 auto* registry = (ServiceRegistry*)handle->data;
513 if (!registry) {
514 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
515 return;
516 }
517 ServiceRegistryRef ref{*registry};
518 auto& state = ref.get<DeviceState>();
519 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
520 auto& stats = ref.get<DataProcessingStats>();
522 size_t ri = 0;
523 while (ri != quotaEvaluator.mOffers.size()) {
524 auto& offer = quotaEvaluator.mOffers[ri];
525 // We were already offered some sharedMemory, so we
526 // do not consider the offer.
527 // FIXME: in principle this should account for memory
528 // available and being offered, however we
529 // want to get out of the woods for now.
530 if (offer.valid && offer.sharedMemory != 0) {
531 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
532 return;
533 }
534 ri++;
535 }
536 // Find the first empty offer and have 1GB of shared memory there
537 for (auto& offer : quotaEvaluator.mOffers) {
538 if (offer.valid == false) {
539 offer.cpu = 0;
540 offer.memory = 0;
541 offer.sharedMemory = 1000000000;
542 offer.valid = true;
543 offer.user = -1;
544 break;
545 }
546 }
548 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
549}
550
551struct DecongestionContext {
554};
555
556auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
557 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
558 auto& ref = task.user<DecongestionContext>().ref;
559
560 auto& decongestion = ref.get<DecongestionService>();
561 auto& proxy = ref.get<FairMQDeviceProxy>();
562 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
563 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
564 return;
565 }
566 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
567 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
568 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
569 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
570 // TODO: this we could cache in the proxy at the bind moment.
571 if (info.channelType != ChannelAccountingType::DPL) {
572 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
573 info.name.c_str());
574
575 continue;
576 }
577 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
578 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
579 info.name.c_str(), oldestTimeslice.timeslice.value);
580 }
581 }
582};
583
584// This is how we do the forwarding, i.e. we push
585// the inputs which are shared between this device and others
586// to the next one in the daisy chain.
587// FIXME: do it in a smarter way than O(N^2)
588static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
589 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
590 auto& proxy = registry.get<FairMQDeviceProxy>();
591 auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copy, consume);
592
593 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
594 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
595 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
596 if (forwardedParts[fi].Size() == 0) {
597 continue;
598 }
599 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
600 auto& parts = forwardedParts[fi];
601 if (info.policy == nullptr) {
602 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
603 continue;
604 }
605 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
606 info.policy->forward(parts, ChannelIndex{fi}, registry);
607 }
608
609 auto& asyncQueue = registry.get<AsyncQueue>();
610 auto& decongestion = registry.get<DecongestionService>();
611 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
612 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
613 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
614 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
615 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
616};
617
618extern volatile int region_read_global_dummy_variable;
620
622void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
623{
624 if (infos.empty() == false) {
625 std::vector<fair::mq::RegionInfo> toBeNotified;
626 toBeNotified.swap(infos); // avoid any MT issue.
627 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
628 for (auto const& info : toBeNotified) {
629 if (dummyRead) {
630 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
631 region_read_global_dummy_variable = ((int*)info.ptr)[i];
632 }
633 }
634 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
635 }
636 }
637}
638
639namespace
640{
642{
643 auto* state = (DeviceState*)handle->data;
645}
646} // namespace
647
648void DataProcessingDevice::initPollers()
649{
650 auto ref = ServiceRegistryRef{mServiceRegistry};
651 auto& deviceContext = ref.get<DeviceContext>();
652 auto& context = ref.get<DataProcessorContext>();
653 auto& spec = ref.get<DeviceSpec const>();
654 auto& state = ref.get<DeviceState>();
655 // We add a timer only in case a channel poller is not there.
656 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
657 for (auto& [channelName, channel] : GetChannels()) {
658 InputChannelInfo* channelInfo;
659 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
660 auto& channelSpec = spec.inputChannels[ci];
661 channelInfo = &state.inputChannelInfos[ci];
662 if (channelSpec.name != channelName) {
663 continue;
664 }
665 channelInfo->channel = &this->GetChannel(channelName, 0);
666 break;
667 }
668 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
669 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
670 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
671 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
672 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
673 continue;
674 }
675 // We only watch receiving sockets.
676 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
677 LOGP(detail, "{} is to send data. Not polling.", channelName);
678 continue;
679 }
680
681 if (channelName.rfind("from_", 0) != 0) {
682 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
683 continue;
684 }
685
686 // We assume there is always a ZeroMQ socket behind.
687 int zmq_fd = 0;
688 size_t zmq_fd_len = sizeof(zmq_fd);
689 // FIXME: I should probably save those somewhere... ;-)
690 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
691 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
692 if (zmq_fd == 0) {
693 LOG(error) << "Cannot get file descriptor for channel." << channelName;
694 continue;
695 }
696 LOGP(detail, "Polling socket for {}", channelName);
697 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
698 pCtx->name = strdup(channelName.c_str());
699 pCtx->loop = state.loop;
700 pCtx->device = this;
701 pCtx->state = &state;
702 pCtx->fd = zmq_fd;
703 assert(channelInfo != nullptr);
704 pCtx->channelInfo = channelInfo;
705 pCtx->socket = &channel[0].GetSocket();
706 pCtx->read = true;
707 poller->data = pCtx;
708 uv_poll_init(state.loop, poller, zmq_fd);
709 if (channelName.rfind("from_", 0) != 0) {
710 LOGP(detail, "{} is an out of band channel.", channelName);
711 state.activeOutOfBandPollers.push_back(poller);
712 } else {
713 channelInfo->pollerIndex = state.activeInputPollers.size();
714 state.activeInputPollers.push_back(poller);
715 }
716 }
717 // In case we do not have any input channel and we do not have
718 // any timers or signal watchers we still wake up whenever we can send data to downstream
719 // devices to allow for enumerations.
720 if (state.activeInputPollers.empty() &&
721 state.activeOutOfBandPollers.empty() &&
722 state.activeTimers.empty() &&
723 state.activeSignals.empty()) {
724 // FIXME: this is to make sure we do not reset the output timer
725 // for readout proxies or similar. In principle this should go once
726 // we move to OutOfBand InputSpec.
727 if (state.inputChannelInfos.empty()) {
728 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
729 deviceContext.exitTransitionTimeout = 0;
730 }
731 for (auto& [channelName, channel] : GetChannels()) {
732 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
733 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
734 continue;
735 }
736 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
737 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
738 continue;
739 }
740 // We assume there is always a ZeroMQ socket behind.
741 int zmq_fd = 0;
742 size_t zmq_fd_len = sizeof(zmq_fd);
743 // FIXME: I should probably save those somewhere... ;-)
744 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
745 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
746 if (zmq_fd == 0) {
747 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
748 continue;
749 }
750 LOG(detail) << "Polling socket for " << channel[0].GetName();
751 // FIXME: leak
752 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
753 pCtx->name = strdup(channelName.c_str());
754 pCtx->loop = state.loop;
755 pCtx->device = this;
756 pCtx->state = &state;
757 pCtx->fd = zmq_fd;
758 pCtx->read = false;
759 poller->data = pCtx;
760 uv_poll_init(state.loop, poller, zmq_fd);
761 state.activeOutputPollers.push_back(poller);
762 }
763 }
764 } else {
765 LOGP(detail, "This is a fake device so we exit after the first iteration.");
766 deviceContext.exitTransitionTimeout = 0;
767 // This is a fake device, so we can request to exit immediately
768 ServiceRegistryRef ref{mServiceRegistry};
769 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
770 // A two second timer to stop internal devices which do not want to
771 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
772 uv_timer_init(state.loop, timer);
773 timer->data = &state;
774 uv_update_time(state.loop);
775 uv_timer_start(timer, on_idle_timer, 2000, 2000);
776 state.activeTimers.push_back(timer);
777 }
778}
779
780void DataProcessingDevice::startPollers()
781{
782 auto ref = ServiceRegistryRef{mServiceRegistry};
783 auto& deviceContext = ref.get<DeviceContext>();
784 auto& state = ref.get<DeviceState>();
785
786 for (auto* poller : state.activeInputPollers) {
787 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
788 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
789 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
790 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
791 }
792 for (auto& poller : state.activeOutOfBandPollers) {
793 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
794 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
795 }
796 for (auto* poller : state.activeOutputPollers) {
797 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
798 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
799 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
800 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
801 }
802
803 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
804 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
805 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
806
807 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
808 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
809 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
810}
811
812void DataProcessingDevice::stopPollers()
813{
814 auto ref = ServiceRegistryRef{mServiceRegistry};
815 auto& deviceContext = ref.get<DeviceContext>();
816 auto& state = ref.get<DeviceState>();
817 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
818 for (auto* poller : state.activeInputPollers) {
819 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
820 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
821 uv_poll_stop(poller);
822 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
823 }
824 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
825 for (auto* poller : state.activeOutOfBandPollers) {
826 uv_poll_stop(poller);
827 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
828 }
829 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
830 for (auto* poller : state.activeOutputPollers) {
831 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
832 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
833 uv_poll_stop(poller);
834 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
835 }
836
837 uv_timer_stop(deviceContext.gracePeriodTimer);
838 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
839 free(deviceContext.gracePeriodTimer);
840 deviceContext.gracePeriodTimer = nullptr;
841
842 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
843 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
844 free(deviceContext.dataProcessingGracePeriodTimer);
845 deviceContext.dataProcessingGracePeriodTimer = nullptr;
846}
847
849{
850 auto ref = ServiceRegistryRef{mServiceRegistry};
851 auto& deviceContext = ref.get<DeviceContext>();
852 auto& context = ref.get<DataProcessorContext>();
853
854 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
855 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
856 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
857 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
858 auto& state = ref.get<DeviceState>();
859 int i = 0;
860 for (auto& di : distinct) {
861 auto& route = spec.inputs[di];
862 if (route.configurator.has_value() == false) {
863 i++;
864 continue;
865 }
866 ExpirationHandler handler{
867 .name = route.configurator->name,
868 .routeIndex = RouteIndex{i++},
869 .lifetime = route.matcher.lifetime,
870 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
871 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
872 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
873 context.expirationHandlers.emplace_back(std::move(handler));
874 }
875
876 if (state.awakeMainThread == nullptr) {
877 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
878 state.awakeMainThread->data = &state;
879 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
880 }
881
882 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
883 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
884 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
885
886 for (auto& channel : GetChannels()) {
887 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
888 &registry = mServiceRegistry,
889 &pendingRegionInfos = mPendingRegionInfos,
890 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
891 std::lock_guard<std::mutex> lock(regionInfoMutex);
892 LOG(detail) << ">>> Region info event" << info.event;
893 LOG(detail) << "id: " << info.id;
894 LOG(detail) << "ptr: " << info.ptr;
895 LOG(detail) << "size: " << info.size;
896 LOG(detail) << "flags: " << info.flags;
897 // Now we check for pending events with the mutex,
898 // so the lines below are atomic.
899 pendingRegionInfos.push_back(info);
900 context.expectedRegionCallbacks -= 1;
901 // We always want to handle these on the main loop,
902 // so we awake it.
903 ServiceRegistryRef ref{registry};
904 uv_async_send(ref.get<DeviceState>().awakeMainThread);
905 });
906 }
907
908 // Add a signal manager for SIGUSR1 so that we can force
909 // an event from the outside, making sure that the event loop can
910 // be unblocked (e.g. by a quitting DPL driver) even when there
911 // is no data pending to be processed.
912 if (deviceContext.sigusr1Handle == nullptr) {
913 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
914 deviceContext.sigusr1Handle->data = &mServiceRegistry;
915 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
916 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
917 }
918 // If there is any signal, we want to make sure they are active
919 for (auto& handle : state.activeSignals) {
920 handle->data = &state;
921 }
922 // When we start, we must make sure that we do listen to the signal
923 deviceContext.sigusr1Handle->data = &mServiceRegistry;
924
926 DataProcessingDevice::initPollers();
927
928 // Whenever we InitTask, we consider as if the previous iteration
929 // was successful, so that even if there is no timer or receiving
930 // channel, we can still start an enumeration.
931 DataProcessorContext* initialContext = nullptr;
932 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
933 if (!idle) {
934 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
935 }
936
937 // We should be ready to run here. Therefore we copy all the
938 // required parts in the DataProcessorContext. Eventually we should
939 // do so on a per thread basis, with fine grained locks.
940 // FIXME: this should not use ServiceRegistry::threadSalt, but
941 // more a ServiceRegistry::globalDataProcessorSalt(N) where
942 // N is the number of the multiplexed data processor.
943 // We will get there.
944 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
945
946 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
947
948 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
949 std::lock_guard<std::mutex> lock(mutex);
950 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
951 };
952 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
957 while (hasPendingEvents(deviceContext)) {
958 // Wait for the callback to signal its done, so that we do not busy wait.
959 uv_run(state.loop, UV_RUN_ONCE);
960 // Handle callbacks if any
961 {
962 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
963 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
964 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
965 }
966 }
967 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
968}
969
971{
972 context.isSink = false;
973 // If nothing is a sink, the rate limiting simply does not trigger.
974 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
975
976 auto ref = ServiceRegistryRef{mServiceRegistry};
977 auto& spec = ref.get<DeviceSpec const>();
978
979 // The policy is now allowed to state the default.
980 context.balancingInputs = spec.completionPolicy.balanceChannels;
981 // This is needed because the internal injected dummy sink should not
982 // try to balance inputs unless the rate limiting is requested.
983 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
984 context.balancingInputs = false;
985 }
986 if (enableRateLimiting) {
987 for (auto& spec : spec.outputs) {
988 if (spec.matcher.binding.value == "dpl-summary") {
989 context.isSink = true;
990 break;
991 }
992 }
993 }
994
995 context.registry = &mServiceRegistry;
998 if (context.error != nullptr) {
999 context.errorHandling = [&errorCallback = context.error,
1000 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1004 auto& err = error_from_ref(e);
1005 auto& context = ref.get<DataProcessorContext>();
1006 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1007 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1008 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1009 auto& stats = ref.get<DataProcessingStats>();
1011 ErrorContext errorContext{record, ref, e};
1012 errorCallback(errorContext);
1013 };
1014 } else {
1015 context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1016 auto& err = error_from_ref(e);
1020 auto& context = ref.get<DataProcessorContext>();
1021 auto& deviceContext = ref.get<DeviceContext>();
1022 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1023 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1024 auto& stats = ref.get<DataProcessingStats>();
1026 switch (deviceContext.processingPolicies.error) {
1028 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1029 throw e;
1030 default:
1031 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1032 break;
1033 }
1034 };
1035 }
1036
1037 auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
1040 bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1041 bool onlyConditions = true;
1042 bool overriddenEarlyForward = false;
1043 for (auto& forwarded : spec.forwards) {
1044 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1045 onlyConditions = false;
1046 }
1048 context.canForwardEarly = false;
1049 overriddenEarlyForward = true;
1050 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1051 break;
1052 }
1053 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1054 context.canForwardEarly = false;
1055 overriddenEarlyForward = true;
1056 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1057 break;
1058 }
1059 }
1060 if (!overriddenEarlyForward && onlyConditions) {
1061 context.canForwardEarly = true;
1062 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1063 }
1064 return canForwardEarly;
1065 };
1066 context.canForwardEarly = decideEarlyForward();
1067}
1068
1070{
1071 auto ref = ServiceRegistryRef{mServiceRegistry};
1072 auto& state = ref.get<DeviceState>();
1073
1074 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1075 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1076 state.quitRequested = false;
1078 state.allowedProcessing = DeviceState::Any;
1079 for (auto& info : state.inputChannelInfos) {
1080 if (info.state != InputChannelState::Pull) {
1081 info.state = InputChannelState::Running;
1082 }
1083 }
1084
1085 // Catch callbacks which fail before we start.
1086 // Notice that when running multiple dataprocessors
1087 // we should probably allow expendable ones to fail.
1088 try {
1089 auto& dpContext = ref.get<DataProcessorContext>();
1090 dpContext.preStartCallbacks(ref);
1091 for (size_t i = 0; i < mStreams.size(); ++i) {
1092 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1093 auto& context = streamRef.get<StreamContext>();
1094 context.preStartStreamCallbacks(streamRef);
1095 }
1096 } catch (std::exception& e) {
1097 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1098 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1099 throw;
1100 } catch (o2::framework::RuntimeErrorRef& e) {
1101 auto& err = error_from_ref(e);
1102 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1103 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1104 throw;
1105 } catch (...) {
1106 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1107 throw;
1108 }
1109
1110 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1111 startPollers();
1112
1113 // Raise to 1 when we are ready to start processing
1114 using o2::monitoring::Metric;
1115 using o2::monitoring::Monitoring;
1116 using o2::monitoring::tags::Key;
1117 using o2::monitoring::tags::Value;
1118
1119 auto& monitoring = ref.get<Monitoring>();
1120 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1121 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1122}
1123
1125{
1126 ServiceRegistryRef ref{mServiceRegistry};
1127 // Raise to 1 when we are ready to start processing
1128 using o2::monitoring::Metric;
1129 using o2::monitoring::Monitoring;
1130 using o2::monitoring::tags::Key;
1131 using o2::monitoring::tags::Value;
1132
1133 auto& monitoring = ref.get<Monitoring>();
1134 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1135
1136 stopPollers();
1137 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1138 auto& dpContext = ref.get<DataProcessorContext>();
1139 dpContext.postStopCallbacks(ref);
1140}
1141
1143{
1144 ServiceRegistryRef ref{mServiceRegistry};
1145 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1146}
1147
1149{
1150 ServiceRegistryRef ref{mServiceRegistry};
1151 auto& state = ref.get<DeviceState>();
1153 bool firstLoop = true;
1154 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1155 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1156
1157 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1158 if (dplEnableMultithreding) {
1159 setenv("UV_THREADPOOL_SIZE", "1", 1);
1160 }
1161
1162 while (state.transitionHandling != TransitionHandlingState::Expired) {
1163 if (state.nextFairMQState.empty() == false) {
1164 (void)this->ChangeState(state.nextFairMQState.back());
1165 state.nextFairMQState.pop_back();
1166 }
1167 // Notify on the main thread the new region callbacks, making sure
1168 // no callback is issued if there is something still processing.
1169 {
1170 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1171 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1172 }
1173 // This will block for the correct delay (or until we get data
1174 // on a socket). We also do not block on the first iteration
1175 // so that devices which do not have a timer can still start an
1176 // enumeration.
1177 {
1178 ServiceRegistryRef ref{mServiceRegistry};
1179 ref.get<DriverClient>().flushPending(mServiceRegistry);
1180 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1181 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1182 // In such case we will take care of it at next iteration.
1183 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1184
1185 auto shouldNotWait = (lastActive != nullptr &&
1186 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1188 if (firstLoop) {
1189 shouldNotWait = true;
1190 firstLoop = false;
1191 }
1192 if (lastActive != nullptr) {
1194 }
1195 if (NewStatePending()) {
1196 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1197 shouldNotWait = true;
1199 }
1201 // If we are Idle, we can then consider the transition to be expired.
1202 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1203 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1204 state.transitionHandling = TransitionHandlingState::Expired;
1205 }
1206 if (state.severityStack.empty() == false) {
1207 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1208 state.severityStack.pop_back();
1209 }
1210 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1211 // shouldNotWait |= info.readPolled;
1212 // }
1213 state.loopReason = DeviceState::NO_REASON;
1214 state.firedTimers.clear();
1215 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1216 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1217 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1218 }
1219 // Run the asynchronous queue just before sleeping again, so that:
1220 // - we can trigger further events from the queue
1221 // - we can guarantee this is the last thing we do in the loop (
1222 // assuming no one else is adding to the queue before this point).
1223 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1224 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1225 ServiceRegistryRef ref{registry};
1226 ref.get<AsyncQueue>();
1227 ref.get<DecongestionService>();
1228 ref.get<DataRelayer>();
1229 // Get the current timeslice for the slot.
1230 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1232 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1233 };
1234 auto& relayer = ref.get<DataRelayer>();
1235 relayer.prunePending(onDrop);
1236 auto& queue = ref.get<AsyncQueue>();
1237 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1238 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1239 if (shouldNotWait == false) {
1240 auto& dpContext = ref.get<DataProcessorContext>();
1241 dpContext.preLoopCallbacks(ref);
1242 }
1243 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1244 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1245 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1246 if ((state.loopReason & state.tracingFlags) != 0) {
1247 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1248 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1249 } else if (state.severityStack.empty() == false) {
1250 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1251 state.severityStack.pop_back();
1252 }
1253 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1254
1255 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1256 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1257 relayer.rescan();
1258 }
1259
1260 if (!state.pendingOffers.empty()) {
1261 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1262 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1263 }
1264 }
1265
1266 // Notify on the main thread the new region callbacks, making sure
1267 // no callback is issued if there is something still processing.
1268 // Notice that we still need to perform callbacks also after
1269 // the socket epolled, because otherwise we would end up serving
1270 // the callback after the first data arrives is the system is too
1271 // fast to transition from Init to Run.
1272 {
1273 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1274 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1275 }
1276
1277 assert(mStreams.size() == mHandles.size());
1279 TaskStreamRef streamRef{-1};
1280 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1281 auto& taskInfo = mStreams[ti];
1282 if (taskInfo.running) {
1283 continue;
1284 }
1285 // Stream 0 is for when we run in
1286 streamRef.index = ti;
1287 }
1288 using o2::monitoring::Metric;
1289 using o2::monitoring::Monitoring;
1290 using o2::monitoring::tags::Key;
1291 using o2::monitoring::tags::Value;
1292 // We have an empty stream, let's check if we have enough
1293 // resources for it to run something
1294 if (streamRef.index != -1) {
1295 // Synchronous execution of the callbacks. This will be moved in the
1296 // moved in the on_socket_polled once we have threading in place.
1297 uv_work_t& handle = mHandles[streamRef.index];
1298 TaskStreamInfo& stream = mStreams[streamRef.index];
1299 handle.data = &mStreams[streamRef.index];
1300
1301 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1302 ServiceRegistryRef ref{registry};
1303 auto& dpStats = ref.get<DataProcessingStats>();
1304 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1305 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1306 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
1307 dpStats.processCommandQueue();
1308 };
1309 auto ref = ServiceRegistryRef{mServiceRegistry};
1310
1311 // Deciding wether to run or not can be done by passing a request to
1312 // the evaluator. In this case, the request is always satisfied and
1313 // we run on whatever resource is available.
1314 auto& spec = ref.get<DeviceSpec const>();
1315 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1316
1317 struct SchedulingStats {
1318 std::atomic<size_t> lastScheduled = 0;
1319 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1320 std::atomic<size_t> numberOfUnscheduled = 0;
1321 std::atomic<size_t> numberOfScheduled = 0;
1322 };
1323 static SchedulingStats schedulingStats;
1324 O2_SIGNPOST_ID_GENERATE(sid, scheduling);
1325 if (enough) {
1326 stream.id = streamRef;
1327 stream.running = true;
1328 stream.registry = &mServiceRegistry;
1329 schedulingStats.lastScheduled = uv_now(state.loop);
1330 schedulingStats.numberOfScheduled++;
1331 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1332 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
1333 if (dplEnableMultithreding) [[unlikely]] {
1334 stream.task = &handle;
1335 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1336 } else {
1337 run_callback(&handle);
1338 run_completion(&handle, 0);
1339 }
1340 } else {
1341 if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1342 (uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) {
1343 O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1344 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1345 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1346 schedulingStats.lastScheduled.load());
1347 } else {
1348 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1349 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1350 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1351 schedulingStats.lastScheduled.load());
1352 }
1353 schedulingStats.numberOfUnscheduled++;
1354 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1355 auto ref = ServiceRegistryRef{mServiceRegistry};
1356 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1357 }
1358 }
1359 }
1360
1361 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
1362 auto& spec = ref.get<DeviceSpec const>();
1364 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1365 auto& info = state.inputChannelInfos[ci];
1366 info.parts.fParts.clear();
1367 }
1368 state.transitionHandling = TransitionHandlingState::NoTransition;
1369}
1370
1374{
1375 auto& context = ref.get<DataProcessorContext>();
1376 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1377 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1378
1379 {
1380 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1381 }
1382 // Whether or not we had something to do.
1383
1384 // Initialise the value for context.allDone. It will possibly be updated
1385 // below if any of the channels is not done.
1386 //
1387 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1388 // expect to receive an EndOfStream signal. Thus we do not wait for these
1389 // to be completed. In the case of data source devices, as they do not have
1390 // real data input channels, they have to signal EndOfStream themselves.
1391 auto& state = ref.get<DeviceState>();
1392 auto& spec = ref.get<DeviceSpec const>();
1393 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1394 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1395 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1396 if (info.channel) {
1397 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1398 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1399 } else {
1400 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1401 }
1402 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1403 });
1404 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1405 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1408 static std::vector<int> pollOrder;
1409 pollOrder.resize(state.inputChannelInfos.size());
1410 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1411 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1412 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1413 });
1414
1415 // Nothing to poll...
1416 if (pollOrder.empty()) {
1417 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1418 return;
1419 }
1420 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1421 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1422 auto delta = currentNewest.value - currentOldest.value;
1423 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1424 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1425 auto& infos = state.inputChannelInfos;
1426
1427 if (context.balancingInputs) {
1428 static int pipelineLength = DefaultsHelpers::pipelineLength();
1429 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1430 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1431 return infos[a].oldestForChannel.value > limitNew;
1432 });
1433 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1434 const auto& channelInfo = state.inputChannelInfos[*it];
1435 if (channelInfo.pollerIndex != -1) {
1436 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1437 auto& pollerContext = *(PollerContext*)(poller->data);
1438 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1439 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1440 bool shouldBeRunning = it < newEnd;
1441 if (running != shouldBeRunning) {
1442 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1443 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1444 }
1445 }
1446 }
1447 }
1448 pollOrder.erase(newEnd, pollOrder.end());
1449 }
1450 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1451
1452 for (auto sci : pollOrder) {
1453 auto& info = state.inputChannelInfos[sci];
1454 auto& channelSpec = spec.inputChannels[sci];
1455 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1456 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1457
1458 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1459 context.allDone = false;
1460 }
1461 if (info.state != InputChannelState::Running) {
1462 // Remember to flush data if we are not running
1463 // and there is some message pending.
1464 if (info.parts.Size()) {
1466 }
1467 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1468 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1469 continue;
1470 }
1471 if (info.channel == nullptr) {
1472 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1473 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1474 continue;
1475 }
1476 // Only poll DPL channels for now.
1478 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1479 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1480 continue;
1481 }
1482 auto& socket = info.channel->GetSocket();
1483 // If we have pending events from a previous iteration,
1484 // we do receive in any case.
1485 // Otherwise we check if there is any pending event and skip
1486 // this channel in case there is none.
1487 if (info.hasPendingEvents == 0) {
1488 socket.Events(&info.hasPendingEvents);
1489 // If we do not read, we can continue.
1490 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1491 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1492 continue;
1493 }
1494 }
1495 // We can reset this, because it means we have seen at least 1
1496 // message after the UV_READABLE was raised.
1497 info.readPolled = false;
1498 // Notice that there seems to be a difference between the documentation
1499 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1500 // is raised does not mean that a message is immediately available to
1501 // read, just that it will be available soon, so the receive can
1502 // still return -2. To avoid this we keep receiving on the socket until
1503 // we get a message. In order not to overflow the DPL queue we process
1504 // one message at the time and we keep track of wether there were more
1505 // to process.
1506 bool newMessages = false;
1507 while (true) {
1508 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1509 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1510 if (info.parts.Size() < 64) {
1511 fair::mq::Parts parts;
1512 info.channel->Receive(parts, 0);
1513 if (parts.Size()) {
1514 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1515 }
1516 for (auto&& part : parts) {
1517 info.parts.fParts.emplace_back(std::move(part));
1518 }
1519 newMessages |= true;
1520 }
1521
1522 if (info.parts.Size() >= 0) {
1524 // Receiving data counts as activity now, so that
1525 // We can make sure we process all the pending
1526 // messages without hanging on the uv_run.
1527 break;
1528 }
1529 }
1530 // We check once again for pending events, keeping track if this was the
1531 // case so that we can immediately repeat this loop and avoid remaining
1532 // stuck in uv_run. This is because we will not get notified on the socket
1533 // if more events are pending due to zeromq level triggered approach.
1534 socket.Events(&info.hasPendingEvents);
1535 if (info.hasPendingEvents) {
1536 info.readPolled = false;
1537 // In case there were messages, we consider it as activity
1538 if (newMessages) {
1539 state.lastActiveDataProcessor.store(&context);
1540 }
1541 }
1542 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1543 channelSpec.name.c_str(), info.id.value);
1544 }
1545}
1546
1548{
1549 auto& context = ref.get<DataProcessorContext>();
1550 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1551 auto& state = ref.get<DeviceState>();
1552 auto& spec = ref.get<DeviceSpec const>();
1553
1554 if (state.streaming == StreamingState::Idle) {
1555 return;
1556 }
1557
1558 context.completed.clear();
1559 context.completed.reserve(16);
1560 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1561 state.lastActiveDataProcessor.store(&context);
1562 }
1563 DanglingContext danglingContext{*context.registry};
1564
1565 context.preDanglingCallbacks(danglingContext);
1566 if (state.lastActiveDataProcessor.load() == nullptr) {
1567 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1568 }
1569 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1570 if (activity.expiredSlots > 0) {
1571 state.lastActiveDataProcessor = &context;
1572 }
1573
1574 context.completed.clear();
1575 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1576 state.lastActiveDataProcessor = &context;
1577 }
1578
1579 context.postDanglingCallbacks(danglingContext);
1580
1581 // If we got notified that all the sources are done, we call the EndOfStream
1582 // callback and return false. Notice that what happens next is actually
1583 // dependent on the callback, not something which is controlled by the
1584 // framework itself.
1585 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1587 state.lastActiveDataProcessor = &context;
1588 }
1589
1590 if (state.streaming == StreamingState::EndOfStreaming) {
1591 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1592 // We keep processing data until we are Idle.
1593 // FIXME: not sure this is the correct way to drain the queues, but
1594 // I guess we will see.
1597 auto& relayer = ref.get<DataRelayer>();
1598
1599 bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
1600
1601 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1602 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1603 }
1604
1605 auto& timingInfo = ref.get<TimingInfo>();
1606 // We should keep the data generated at end of stream only for those
1607 // which are not sources.
1608 timingInfo.keepAtEndOfStream = shouldProcess;
1609 // Fill timinginfo with some reasonable values for data sent with endOfStream
1610 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1611 timingInfo.tfCounter = -1;
1612 timingInfo.firstTForbit = -1;
1613 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1614 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1615 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1616
1617 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1618
1619 context.preEOSCallbacks(eosContext);
1620 auto& streamContext = ref.get<StreamContext>();
1621 streamContext.preEOSCallbacks(eosContext);
1622 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1623 streamContext.postEOSCallbacks(eosContext);
1624 context.postEOSCallbacks(eosContext);
1625
1626 for (auto& channel : spec.outputChannels) {
1627 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1629 }
1630 // This is needed because the transport is deleted before the device.
1631 relayer.clear();
1633 // In case we should process, note the data processor responsible for it
1634 if (shouldProcess) {
1635 state.lastActiveDataProcessor = &context;
1636 }
1637 // On end of stream we shut down all output pollers.
1638 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1639 for (auto& poller : state.activeOutputPollers) {
1640 uv_poll_stop(poller);
1641 }
1642 return;
1643 }
1644
1645 if (state.streaming == StreamingState::Idle) {
1646 // On end of stream we shut down all output pollers.
1647 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1648 for (auto& poller : state.activeOutputPollers) {
1649 uv_poll_stop(poller);
1650 }
1651 }
1652
1653 return;
1654}
1655
1657{
1658 ServiceRegistryRef ref{mServiceRegistry};
1659 ref.get<DataRelayer>().clear();
1660 auto& deviceContext = ref.get<DeviceContext>();
1661 // If the signal handler is there, we should
1662 // hide the registry from it, so that we do not
1663 // end up calling the signal handler on something
1664 // which might not be there anymore.
1665 if (deviceContext.sigusr1Handle) {
1666 deviceContext.sigusr1Handle->data = nullptr;
1667 }
1668 // Makes sure we do not have a working context on
1669 // shutdown.
1670 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1671 handle->data = nullptr;
1672 }
1673}
1674
1677 {
1678 }
1679};
1680
1686{
1687 using InputInfo = DataRelayer::InputInfo;
1689
1690 auto& context = ref.get<DataProcessorContext>();
1691 // This is the same id as the upper level function, so we get the events
1692 // associated with the same interval. We will simply use "handle_data" as
1693 // the category.
1694 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1695
1696 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1697 // and we do a few stats. We bind parts as a lambda captured variable, rather
1698 // than an input, because we do not want the outer loop actually be exposed
1699 // to the implementation details of the messaging layer.
1700 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1701 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1702 auto ref = ServiceRegistryRef{*context.registry};
1703 auto& stats = ref.get<DataProcessingStats>();
1704 auto& state = ref.get<DeviceState>();
1705 auto& parts = info.parts;
1706 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1707
1708 std::vector<InputInfo> results;
1709 // we can reserve the upper limit
1710 results.reserve(parts.Size() / 2);
1711 size_t nTotalPayloads = 0;
1712
1713 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1714 results.emplace_back(position, length, type, index);
1715 if (type != InputType::Invalid && length > 1) {
1716 nTotalPayloads += length - 1;
1717 }
1718 };
1719
1720 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1721 auto* headerData = parts.At(pi)->GetData();
1722 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1723 auto dh = o2::header::get<DataHeader*>(headerData);
1724 if (sih) {
1725 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1726 info.state = sih->state;
1727 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1728 state.lastActiveDataProcessor = &context;
1729 if (dh) {
1730 LOGP(error, "Found data attached to a SourceInfoHeader");
1731 }
1732 continue;
1733 }
1734 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1735 if (dih) {
1736 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1737 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1738 state.lastActiveDataProcessor = &context;
1739 if (dh) {
1740 LOGP(error, "Found data attached to a DomainInfoHeader");
1741 }
1742 continue;
1743 }
1744 if (!dh) {
1745 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1746 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1747 continue;
1748 }
1749 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1750 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1751 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1752 continue;
1753 }
1754 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1755 // We only deal with the tracking of parts if the log is enabled.
1756 // This is because in principle we should track the size of each of
1757 // the parts and sum it up. Not for now.
1758 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1759 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1760 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1761 if (!dph) {
1762 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1763 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1764 continue;
1765 }
1766 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1767 // this is indicating a sequence of payloads following the header
1768 // FIXME: we will probably also set the DataHeader version
1769 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1770 pi += dh->splitPayloadParts - 1;
1771 } else {
1772 // We can set the type for the next splitPayloadParts
1773 // because we are guaranteed they are all the same.
1774 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1775 // pair.
1776 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1777 if (finalSplitPayloadIndex > parts.Size()) {
1778 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1779 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1780 continue;
1781 }
1782 insertInputInfo(pi, 2, InputType::Data, info.id);
1783 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1784 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1785 }
1786 }
1787 }
1788 if (results.size() + nTotalPayloads != parts.Size()) {
1789 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1790 return std::nullopt;
1791 }
1792 return results;
1793 };
1794
1795 auto reportError = [ref](const char* message) {
1796 auto& stats = ref.get<DataProcessingStats>();
1798 };
1799
1800 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1801 auto& relayer = ref.get<DataRelayer>();
1802 auto& state = ref.get<DeviceState>();
1803 static WaitBackpressurePolicy policy;
1804 auto& parts = info.parts;
1805 // We relay execution to make sure we have a complete set of parts
1806 // available.
1807 bool hasBackpressure = false;
1808 size_t minBackpressureTimeslice = -1;
1809 bool hasData = false;
1810 size_t oldestPossibleTimeslice = -1;
1811 static std::vector<int> ordering;
1812 // Same as inputInfos but with iota.
1813 ordering.resize(inputInfos.size());
1814 std::iota(ordering.begin(), ordering.end(), 0);
1815 // stable sort orderings by type and position
1816 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
1817 auto const& ai = inputInfos[a];
1818 auto const& bi = inputInfos[b];
1819 if (ai.type != bi.type) {
1820 return ai.type < bi.type;
1821 }
1822 return ai.position < bi.position;
1823 });
1824 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
1825 auto const& input = inputInfos[ordering[ii]];
1826 switch (input.type) {
1827 case InputType::Data: {
1828 hasData = true;
1829 auto headerIndex = input.position;
1830 auto nMessages = 0;
1831 auto nPayloadsPerHeader = 0;
1832 if (input.size > 2) {
1833 // header and multiple payload sequence
1834 nMessages = input.size;
1835 nPayloadsPerHeader = nMessages - 1;
1836 } else {
1837 // multiple header-payload pairs
1838 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
1839 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
1840 nPayloadsPerHeader = 1;
1841 ii += (nMessages / 2) - 1;
1842 }
1843 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1844 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
1845 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
1846 slot.index, oldestOutputInfo.timeslice.value);
1847 ref.get<AsyncQueue>();
1848 ref.get<DecongestionService>();
1849 ref.get<DataRelayer>();
1850 // Get the current timeslice for the slot.
1851 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1853 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
1854 };
1855 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
1856 &parts.At(headerIndex),
1857 input,
1858 nMessages,
1859 nPayloadsPerHeader,
1860 onDrop);
1861 switch (relayed.type) {
1863 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
1864 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
1865 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1866 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
1867 info.backpressureNotified = true;
1868 info.normalOpsNotified = false;
1869 }
1870 policy.backpressure(info);
1871 hasBackpressure = true;
1872 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
1873 break;
1877 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
1878 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
1879 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1880 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
1881 info.normalOpsNotified = true;
1882 info.backpressureNotified = false;
1883 }
1884 break;
1885 }
1886 } break;
1887 case InputType::SourceInfo: {
1888 LOGP(detail, "Received SourceInfo");
1889 auto& context = ref.get<DataProcessorContext>();
1890 state.lastActiveDataProcessor = &context;
1891 auto headerIndex = input.position;
1892 auto payloadIndex = input.position + 1;
1893 assert(payloadIndex < parts.Size());
1894 // FIXME: the message with the end of stream cannot contain
1895 // split parts.
1896 parts.At(headerIndex).reset(nullptr);
1897 parts.At(payloadIndex).reset(nullptr);
1898 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
1899 // parts.At(headerIndex + 1 + i).reset(nullptr);
1900 // }
1901 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
1902
1903 } break;
1904 case InputType::DomainInfo: {
1907 auto& context = ref.get<DataProcessorContext>();
1908 state.lastActiveDataProcessor = &context;
1909 auto headerIndex = input.position;
1910 auto payloadIndex = input.position + 1;
1911 assert(payloadIndex < parts.Size());
1912 // FIXME: the message with the end of stream cannot contain
1913 // split parts.
1914
1915 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
1916 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
1917 break;
1918 }
1919 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
1920 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
1921 parts.At(headerIndex).reset(nullptr);
1922 parts.At(payloadIndex).reset(nullptr);
1923 } break;
1924 case InputType::Invalid: {
1925 reportError("Invalid part found.");
1926 } break;
1927 }
1928 }
1931 if (oldestPossibleTimeslice != (size_t)-1) {
1932 info.oldestForChannel = {oldestPossibleTimeslice};
1933 auto& context = ref.get<DataProcessorContext>();
1934 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
1935 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
1936 state.lastActiveDataProcessor = &context;
1937 }
1938 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
1939 parts.fParts.erase(it, parts.end());
1940 if (parts.fParts.size()) {
1941 LOG(debug) << parts.fParts.size() << " messages backpressured";
1942 }
1943 };
1944
1945 // Second part. This is the actual outer loop we want to obtain, with
1946 // implementation details which can be read. Notice how most of the state
1947 // is actually hidden. For example we do not expose what "input" is. This
1948 // will allow us to keep the same toplevel logic even if the actual meaning
1949 // of input is changed (for example we might move away from multipart
1950 // messages). Notice also that we need to act diffently depending on the
1951 // actual CompletionOp we want to perform. In particular forwarding inputs
1952 // also gets rid of them from the cache.
1953 auto inputTypes = getInputTypes();
1954 if (bool(inputTypes) == false) {
1955 reportError("Parts should come in couples. Dropping it.");
1956 return;
1957 }
1958 handleValidMessages(*inputTypes);
1959 return;
1960}
1961
1962namespace
1963{
1964struct InputLatency {
1965 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
1966 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
1967};
1968
1969auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
1970{
1971 InputLatency result;
1972
1973 for (auto& item : record) {
1974 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
1975 if (header == nullptr) {
1976 continue;
1977 }
1978 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
1979 if (partLatency < 0) {
1980 partLatency = 0;
1981 }
1982 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
1983 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
1984 }
1985 return result;
1986};
1987
1988auto calculateTotalInputRecordSize(InputRecord const& record) -> int
1989{
1990 size_t totalInputSize = 0;
1991 for (auto& item : record) {
1992 auto* header = o2::header::get<DataHeader*>(item.header);
1993 if (header == nullptr) {
1994 continue;
1995 }
1996 totalInputSize += header->payloadSize;
1997 }
1998 return totalInputSize;
1999};
2000
2001template <typename T>
2002void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2003{
2004 T prev_value = maximum_value;
2005 while (prev_value < value &&
2006 !maximum_value.compare_exchange_weak(prev_value, value)) {
2007 }
2008}
2009} // namespace
2010
2011bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2012{
2013 auto& context = ref.get<DataProcessorContext>();
2014 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2015 // This is the actual hidden state for the outer loop. In case we decide we
2016 // want to support multithreaded dispatching of operations, I can simply
2017 // move these to some thread local store and the rest of the lambdas
2018 // should work just fine.
2019 std::vector<MessageSet> currentSetOfInputs;
2020
2021 //
2022 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2023 auto& relayer = ref.get<DataRelayer>();
2024 if (consume) {
2025 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2026 } else {
2027 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2028 }
2029 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2030 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2031 const char* headerptr = nullptr;
2032 const char* payloadptr = nullptr;
2033 size_t payloadSize = 0;
2034 // - each input can have multiple parts
2035 // - "part" denotes a sequence of messages belonging together, the first message of the
2036 // sequence is the header message
2037 // - each part has one or more payload messages
2038 // - InputRecord provides all payloads as header-payload pair
2039 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2040 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2041 headerptr = static_cast<char const*>(headerMsg->GetData());
2042 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2043 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2044 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2045 }
2046 return DataRef{};
2047 };
2048 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2049 return currentSetOfInputs[i].getNumberOfPairs();
2050 };
2051 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2052 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2053 return header.GetRefCount();
2054 };
2055 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2056 };
2057
2058 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2059 auto& relayer = ref.get<DataRelayer>();
2061 };
2062
2063 // I need a preparation step which gets the current timeslice id and
2064 // propagates it to the various contextes (i.e. the actual entities which
2065 // create messages) because the messages need to have the timeslice id into
2066 // it.
2067 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2068 auto& relayer = ref.get<DataRelayer>();
2069 auto& timingInfo = ref.get<TimingInfo>();
2070 auto timeslice = relayer.getTimesliceForSlot(i);
2071
2072 timingInfo.timeslice = timeslice.value;
2073 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2074 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2075 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2076 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2077 };
2078 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2079 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2080 auto& relayer = ref.get<DataRelayer>();
2081 auto& timingInfo = ref.get<TimingInfo>();
2082 auto timeslice = relayer.getTimesliceForSlot(i);
2083 // We report wether or not this timing info refers to a new Run.
2084 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2085 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2086 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2087 // FIXME: for now there is only one stream, however we
2088 // should calculate this correctly once we finally get the
2089 // the StreamContext in.
2090 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2091 };
2092
2093 // When processing them, timers will have to be cleaned up
2094 // to avoid double counting them.
2095 // This was actually the easiest solution we could find for
2096 // O2-646.
2097 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2098 assert(record.size() == currentSetOfInputs.size());
2099 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2100 // assuming that for timer inputs we do have exactly one PartRef object
2101 // in the MessageSet, multiple PartRef Objects are only possible for either
2102 // split payload messages of wildcard matchers, both for data inputs
2103 DataRef input = record.getByPos(ii);
2104 if (input.spec->lifetime != Lifetime::Timer) {
2105 continue;
2106 }
2107 if (input.header == nullptr) {
2108 continue;
2109 }
2110 // This will hopefully delete the message.
2111 currentSetOfInputs[ii].clear();
2112 }
2113 };
2114
2115 // Function to cleanup record. For the moment we
2116 // simply use it to keep track of input messages
2117 // which are not needed, to display them in the GUI.
2118 auto cleanupRecord = [](InputRecord& record) {
2119 if (O2_LOG_ENABLED(parts) == false) {
2120 return;
2121 }
2122 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2123 DataRef input = record.getByPos(pi);
2124 if (input.header == nullptr) {
2125 continue;
2126 }
2127 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2128 if (sih) {
2129 continue;
2130 }
2131
2132 auto dh = o2::header::get<DataHeader*>(input.header);
2133 if (!dh) {
2134 continue;
2135 }
2136 // We use the address of the first header of a split payload
2137 // to identify the interval.
2138 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2139 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2140
2141 // No split parts, we simply skip the payload
2142 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2143 // this is indicating a sequence of payloads following the header
2144 // FIXME: we will probably also set the DataHeader version
2145 pi += dh->splitPayloadParts - 1;
2146 } else {
2147 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2148 }
2149 }
2150 };
2151
2152 ref.get<DataRelayer>().getReadyToProcess(completed);
2153 if (completed.empty() == true) {
2154 LOGP(debug, "No computations available for dispatching.");
2155 return false;
2156 }
2157
2158 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2159 auto& stats = ref.get<DataProcessingStats>();
2160 auto& states = ref.get<DataProcessingStates>();
2161 std::atomic_thread_fence(std::memory_order_release);
2162 char relayerSlotState[1024];
2163 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2164 char* buffer = relayerSlotState + written;
2165 for (size_t ai = 0; ai != record.size(); ai++) {
2166 buffer[ai] = record.isValid(ai) ? '3' : '0';
2167 }
2168 buffer[record.size()] = 0;
2169 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2170 .size = (int)(record.size() + buffer - relayerSlotState),
2171 .data = relayerSlotState});
2172 uint64_t tEnd = uv_hrtime();
2173 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2174 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2176 // Sum up the total wall time, in milliseconds.
2178 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2180 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2181 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2182 auto latency = calculateInputRecordLatency(record, tStartMilli);
2183 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2184 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2185 static int count = 0;
2187 count++;
2188 };
2189
2190 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2191 auto& states = ref.get<DataProcessingStates>();
2192 std::atomic_thread_fence(std::memory_order_release);
2193 char relayerSlotState[1024];
2194 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2195 char* buffer = strchr(relayerSlotState, ' ') + 1;
2196 for (size_t ai = 0; ai != record.size(); ai++) {
2197 buffer[ai] = record.isValid(ai) ? '2' : '0';
2198 }
2199 buffer[record.size()] = 0;
2200 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2201 };
2202
2203 // This is the main dispatching loop
2204 auto& state = ref.get<DeviceState>();
2205 auto& spec = ref.get<DeviceSpec const>();
2206
2207 auto& dpContext = ref.get<DataProcessorContext>();
2208 auto& streamContext = ref.get<StreamContext>();
2209 O2_SIGNPOST_ID_GENERATE(sid, device);
2210 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2211
2212 auto& stats = ref.get<DataProcessingStats>();
2213 auto& relayer = ref.get<DataRelayer>();
2214 using namespace o2::framework;
2215 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2216 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2217 switch (spec.completionPolicy.order) {
2219 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2220 break;
2222 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2223 break;
2225 default:
2226 break;
2227 }
2228
2229 for (auto action : completed) {
2230 O2_SIGNPOST_ID_GENERATE(aid, device);
2231 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2233 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2234 continue;
2235 }
2236
2237 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2239 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2243 updateRunInformation(TimesliceSlot{action.slot});
2244 }
2245 InputSpan span = getInputSpan(action.slot, shouldConsume);
2246 auto& spec = ref.get<DeviceSpec const>();
2247 InputRecord record{spec.inputs,
2248 span,
2249 *context.registry};
2250 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2251 {
2252 // Notice this should be thread safe and reentrant
2253 // as it is called from many threads.
2254 streamContext.preProcessingCallbacks(processContext);
2255 dpContext.preProcessingCallbacks(processContext);
2256 }
2258 context.postDispatchingCallbacks(processContext);
2259 if (spec.forwards.empty() == false) {
2260 auto& timesliceIndex = ref.get<TimesliceIndex>();
2261 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2262 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2263 continue;
2264 }
2265 }
2266 // If there is no optional inputs we canForwardEarly
2267 // the messages to that parallel processing can happen.
2268 // In this case we pass true to indicate that we want to
2269 // copy the messages to the subsequent data processor.
2270 bool hasForwards = spec.forwards.empty() == false;
2272
2273 if (context.canForwardEarly && hasForwards && consumeSomething) {
2274 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2275 auto& timesliceIndex = ref.get<TimesliceIndex>();
2276 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2277 }
2278 markInputsAsDone(action.slot);
2279
2280 uint64_t tStart = uv_hrtime();
2281 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2282 preUpdateStats(action, record, tStart);
2283
2284 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2285
2286 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2287 auto& state = ref.get<DeviceState>();
2288 auto& spec = ref.get<DeviceSpec const>();
2289 auto& streamContext = ref.get<StreamContext>();
2290 auto& dpContext = ref.get<DataProcessorContext>();
2291 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2292 switch (action.op) {
2297 return true;
2298 break;
2299 default:
2300 return false;
2301 }
2302 };
2303 if (state.quitRequested == false) {
2304 {
2305 // Callbacks from services
2306 dpContext.preProcessingCallbacks(processContext);
2307 streamContext.preProcessingCallbacks(processContext);
2308 dpContext.preProcessingCallbacks(processContext);
2309 // Callbacks from users
2310 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2311 }
2312 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2313 if (context.statefulProcess && shouldProcess(action)) {
2314 // This way, usercode can use the the same processing context to identify
2315 // its signposts and we can map user code to device iterations.
2316 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2317 (context.statefulProcess)(processContext);
2318 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2319 } else if (context.statelessProcess && shouldProcess(action)) {
2320 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2321 (context.statelessProcess)(processContext);
2322 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2323 } else if (context.statelessProcess || context.statefulProcess) {
2324 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2325 } else {
2326 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2328 }
2329 if (shouldProcess(action)) {
2330 auto& timingInfo = ref.get<TimingInfo>();
2331 if (timingInfo.globalRunNumberChanged) {
2332 context.lastRunNumberProcessed = timingInfo.runNumber;
2333 }
2334 }
2335
2336 // Notify the sink we just consumed some timeframe data
2337 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2338 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2339 auto& allocator = ref.get<DataAllocator>();
2340 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2341 }
2342
2343 // Extra callback which allows a service to add extra outputs.
2344 // This is needed e.g. to ensure that injected CCDB outputs are added
2345 // before an end of stream.
2346 {
2347 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2348 dpContext.finaliseOutputsCallbacks(processContext);
2349 streamContext.finaliseOutputsCallbacks(processContext);
2350 }
2351
2352 {
2353 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2354 dpContext.postProcessingCallbacks(processContext);
2355 streamContext.postProcessingCallbacks(processContext);
2356 }
2357 }
2358 };
2359
2360 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2361 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2362 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2363 }
2364 if (noCatch) {
2365 try {
2366 runNoCatch(action);
2367 } catch (o2::framework::RuntimeErrorRef e) {
2368 (context.errorHandling)(e, record);
2369 }
2370 } else {
2371 try {
2372 runNoCatch(action);
2373 } catch (std::exception& ex) {
2377 auto e = runtime_error(ex.what());
2378 (context.errorHandling)(e, record);
2379 } catch (o2::framework::RuntimeErrorRef e) {
2380 (context.errorHandling)(e, record);
2381 }
2382 }
2383 if (state.severityStack.empty() == false) {
2384 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2385 state.severityStack.pop_back();
2386 }
2387
2388 postUpdateStats(action, record, tStart, tStartMilli);
2389 // We forward inputs only when we consume them. If we simply Process them,
2390 // we keep them for next message arriving.
2392 cleanupRecord(record);
2393 context.postDispatchingCallbacks(processContext);
2394 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2395 }
2396 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2397 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2398 auto& timesliceIndex = ref.get<TimesliceIndex>();
2399 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2400 }
2401 context.postForwardingCallbacks(processContext);
2403 cleanTimers(action.slot, record);
2404 }
2405 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2406 }
2407 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2408
2409 // We now broadcast the end of stream if it was requested
2410 if (state.streaming == StreamingState::EndOfStreaming) {
2411 LOGP(detail, "Broadcasting end of stream");
2412 for (auto& channel : spec.outputChannels) {
2414 }
2416 }
2417
2418 return true;
2419}
2420
2422{
2423 LOG(error) << msg;
2424 ServiceRegistryRef ref{mServiceRegistry};
2425 auto& stats = ref.get<DataProcessingStats>();
2427}
2428
2429std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2430{
2431
2432 if (registry.active<ConfigurationInterface>()) {
2433 auto& cfg = registry.get<ConfigurationInterface>();
2434 try {
2435 cfg.getRecursive(name);
2436 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2437 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2438 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2439 configStore->preload();
2440 configStore->activate();
2441 return configStore;
2442 } catch (...) {
2443 // No overrides...
2444 }
2445 }
2446 return {nullptr};
2447}
2448
2449} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
std::ostringstream debug
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:599
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static std::vector< fair::mq::Parts > routeForwardedMessages(FairMQDeviceProxy &proxy, TimesliceSlot slot, std::vector< MessageSet > &currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
Helper to route messages for forwarding.
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:619
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153