Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <atomic>
33#include "Framework/InputSpan.h"
34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
36#endif
37#include "Framework/Signpost.h"
50
51#include "DecongestionService.h"
53#include "DataRelayerHelpers.h"
54#include "Headers/DataHeader.h"
56
57#include <Framework/Tracing.h>
58
59#include <fairmq/Parts.h>
60#include <fairmq/Socket.h>
61#include <fairmq/ProgOptions.h>
62#include <fairmq/shmem/Message.h>
63#include <Configuration/ConfigurationInterface.h>
64#include <Configuration/ConfigurationFactory.h>
65#include <Monitoring/Monitoring.h>
66#include <TMessage.h>
67#include <TClonesArray.h>
68
69#include <fmt/ostream.h>
70#include <algorithm>
71#include <vector>
72#include <numeric>
73#include <memory>
74#include <uv.h>
75#include <execinfo.h>
76#include <sstream>
77#include <boost/property_tree/json_parser.hpp>
78
79// Formatter to avoid having to rewrite the ostream operator for the enum
80namespace fmt
81{
82template <>
85} // namespace fmt
86
87// A log to use for general device logging
89// A log to use for general device logging
91// Special log to keep track of the lifetime of the parts
93// Stream which keeps track of the calibration lifetime logic
95// Special log to track the async queue behavior
97// Special log to track the forwarding requests
99// Special log to track CCDB related requests
101// Special log to track task scheduling
103
104using namespace o2::framework;
105using ConfigurationInterface = o2::configuration::ConfigurationInterface;
107
108constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
109
110namespace o2::framework
111{
112
113template <>
117
121{
122 auto* state = (DeviceState*)handle->data;
123 state->loopReason |= DeviceState::TIMER_EXPIRED;
124}
125
127{
128 auto* state = (DeviceState*)s->data;
130}
131
132DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
133{
134 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
135 return devices[running.index];
136}
137
143
145 : mRunningDevice{running},
146 mConfigRegistry{nullptr},
147 mServiceRegistry{registry}
148{
149 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
150 if (key == "cleanup") {
152 auto& deviceState = ref.get<DeviceState>();
153 int64_t cleanupCount = deviceState.cleanupCount.load();
154 int64_t newCleanupCount = std::stoll(value);
155 if (newCleanupCount <= cleanupCount) {
156 return;
157 }
158 deviceState.cleanupCount.store(newCleanupCount);
159 for (auto& info : deviceState.inputChannelInfos) {
160 fair::mq::Parts parts;
161 while (info.channel->Receive(parts, 0)) {
162 LOGP(debug, "Dropping {} parts", parts.Size());
163 if (parts.Size() == 0) {
164 break;
165 }
166 }
167 }
168 }
169 });
170
171 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
173 auto& deviceState = ref.get<DeviceState>();
174 auto& control = ref.get<ControlService>();
175 auto& callbacks = ref.get<CallbackService>();
176 control.notifyDeviceState(fair::mq::GetStateName(state));
178
179 if (deviceState.nextFairMQState.empty() == false) {
180 auto state = deviceState.nextFairMQState.back();
181 (void)this->ChangeState(state);
182 deviceState.nextFairMQState.pop_back();
183 }
184 };
185
186 // 99 is to execute DPL callbacks last
187 this->SubscribeToStateChange("99-dpl", stateWatcher);
188
189 // One task for now.
190 mStreams.resize(1);
191 mHandles.resize(1);
192
193 ServiceRegistryRef ref{mServiceRegistry};
194
195 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
196 auto& state = ref.get<DeviceState>();
197 assert(state.loop);
198 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
199 mAwakeHandle->data = &state;
200 if (res < 0) {
201 LOG(error) << "Unable to initialise subscription";
202 }
203
205 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
206 int res = uv_async_send(wakeHandle);
207 if (res < 0) {
208 LOG(error) << "Unable to notify subscription";
209 }
210 LOG(debug) << "State transition requested";
211 });
212}
213
214// Callback to execute the processing. Notice how the data is
215// is a vector of DataProcessorContext so that we can index the correct
216// one with the thread id. For the moment we simply use the first one.
217void run_callback(uv_work_t* handle)
218{
219 auto* task = (TaskStreamInfo*)handle->data;
220 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
221 // We create a new signpost interval for this specific data processor. Same id, same data processor.
222 auto& dataProcessorContext = ref.get<DataProcessorContext>();
223 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
224 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
227 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
228}
229
230// Once the processing in a thread is done, this is executed on the main thread.
231void run_completion(uv_work_t* handle, int status)
232{
233 auto* task = (TaskStreamInfo*)handle->data;
234 // Notice that the completion, while running on the main thread, still
235 // has a salt which is associated to the actual stream which was doing the computation
236 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
237 auto& state = ref.get<DeviceState>();
238 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
239
240 using o2::monitoring::Metric;
241 using o2::monitoring::Monitoring;
242 using o2::monitoring::tags::Key;
243 using o2::monitoring::tags::Value;
244
245 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
246 auto& dpStats = ref.get<DataProcessingStats>();
247 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
248 // For now we give back the offer if we did not use it completely.
249 // In principle we should try to run until the offer is fully consumed.
250 stats.totalConsumedTimeslices += std::min<int64_t>(accumulatedConsumed.timeslices, 1);
251
252 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
253 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedTimeslices});
254 dpStats.processCommandQueue();
255 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
256 assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]);
257 };
258
259 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
260 auto& dpStats = ref.get<DataProcessingStats>();
261 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
262 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
263 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
264 dpStats.processCommandQueue();
265 };
266
267 for (auto& consumer : state.offerConsumers) {
268 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
269 }
270 state.offerConsumers.clear();
271 quotaEvaluator.handleExpired(reportExpiredOffer);
272 quotaEvaluator.dispose(task->id.index);
273 task->running = false;
274}
275
276// Context for polling
278 enum struct PollerState : char { Stopped,
280 Connected,
281 Suspended };
282 char const* name = nullptr;
283 uv_loop_t* loop = nullptr;
285 DeviceState* state = nullptr;
286 fair::mq::Socket* socket = nullptr;
288 int fd = -1;
289 bool read = true;
291};
292
293void on_socket_polled(uv_poll_t* poller, int status, int events)
294{
295 auto* context = (PollerContext*)poller->data;
296 assert(context);
297 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
298 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
299 switch (events) {
300 case UV_READABLE: {
301 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
302 context->state->loopReason |= DeviceState::DATA_INCOMING;
303 } break;
304 case UV_WRITABLE: {
305 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
306 if (context->read) {
307 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
308 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
309 context->state->loopReason |= DeviceState::DATA_CONNECTED;
310 } else {
311 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
312 context->state->loopReason |= DeviceState::DATA_OUTGOING;
313 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
314 // just wait for the disconnect.
315 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
316 }
317 context->pollerState = PollerContext::PollerState::Connected;
318 } break;
319 case UV_DISCONNECT: {
320 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
321 } break;
322 case UV_PRIORITIZED: {
323 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
324 } break;
325 }
326 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
327}
328
329void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
330{
331 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
332 auto* context = (PollerContext*)poller->data;
333 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
334 if (status < 0) {
335 LOGP(fatal, "Error while polling {}: {}", context->name, status);
336 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
337 }
338 switch (events) {
339 case UV_READABLE: {
340 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
341 context->state->loopReason |= DeviceState::DATA_INCOMING;
342 assert(context->channelInfo);
343 context->channelInfo->readPolled = true;
344 } break;
345 case UV_WRITABLE: {
346 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
347 if (context->read) {
348 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
349 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
350 } else {
351 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
352 context->state->loopReason |= DeviceState::DATA_OUTGOING;
353 }
354 } break;
355 case UV_DISCONNECT: {
356 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
357 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
358 } break;
359 case UV_PRIORITIZED: {
360 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
361 } break;
362 }
363 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
364}
365
374{
375 auto ref = ServiceRegistryRef{mServiceRegistry};
376 auto& context = ref.get<DataProcessorContext>();
377 auto& spec = getRunningDevice(mRunningDevice, ref);
378
379 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
380 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
381 context.statelessProcess = spec.algorithm.onProcess;
382 context.statefulProcess = nullptr;
383 context.error = spec.algorithm.onError;
384 context.initError = spec.algorithm.onInitError;
385
386 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
387 if (configStore == nullptr) {
388 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
389 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
390 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
391 configStore->preload();
392 configStore->activate();
393 }
394
395 using boost::property_tree::ptree;
396
398 for (auto& entry : configStore->store()) {
399 std::stringstream ss;
400 std::string str;
401 if (entry.second.empty() == false) {
402 boost::property_tree::json_parser::write_json(ss, entry.second, false);
403 str = ss.str();
404 str.pop_back(); // remove EoL
405 } else {
406 str = entry.second.get_value<std::string>();
407 }
408 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
409 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
410 }
411
412 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
413
414 // Setup the error handlers for init
415 if (context.initError) {
416 context.initErrorHandling = [&errorCallback = context.initError,
417 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
421 auto& context = ref.get<DataProcessorContext>();
422 auto& err = error_from_ref(e);
423 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
424 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
425 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
426 auto& stats = ref.get<DataProcessingStats>();
428 InitErrorContext errorContext{ref, e};
429 errorCallback(errorContext);
430 };
431 } else {
432 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
433 auto& err = error_from_ref(e);
437 auto& context = ref.get<DataProcessorContext>();
438 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
439 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
440 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
441 auto& stats = ref.get<DataProcessingStats>();
443 exit(1);
444 };
445 }
446
447 context.expirationHandlers.clear();
448 context.init = spec.algorithm.onInit;
449 if (context.init) {
450 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
451 InitContext initContext{*mConfigRegistry, mServiceRegistry};
452
453 if (noCatch) {
454 try {
455 context.statefulProcess = context.init(initContext);
457 if (context.initErrorHandling) {
458 (context.initErrorHandling)(e);
459 }
460 }
461 } else {
462 try {
463 context.statefulProcess = context.init(initContext);
464 } catch (std::exception& ex) {
468 auto e = runtime_error(ex.what());
469 (context.initErrorHandling)(e);
471 (context.initErrorHandling)(e);
472 }
473 }
474 }
475 auto& state = ref.get<DeviceState>();
476 state.inputChannelInfos.resize(spec.inputChannels.size());
480 int validChannelId = 0;
481 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
482 auto& name = spec.inputChannels[ci].name;
483 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
484 state.inputChannelInfos[ci].state = InputChannelState::Pull;
485 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
486 validChannelId++;
487 } else {
488 state.inputChannelInfos[ci].id = {validChannelId++};
489 }
490 }
491
492 // Invoke the callback policy for this device.
493 if (spec.callbacksPolicy.policy != nullptr) {
494 InitContext initContext{*mConfigRegistry, mServiceRegistry};
495 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
496 }
497
498 // Services which are stream should be initialised now
499 auto* options = GetConfig();
500 for (size_t si = 0; si < mStreams.size(); ++si) {
502 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
503 }
504 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
505}
506
507void on_signal_callback(uv_signal_t* handle, int signum)
508{
509 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
510 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
511
512 auto* registry = (ServiceRegistry*)handle->data;
513 if (!registry) {
514 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
515 return;
516 }
517 ServiceRegistryRef ref{*registry};
518 auto& state = ref.get<DeviceState>();
519 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
520 auto& stats = ref.get<DataProcessingStats>();
522 size_t ri = 0;
523 while (ri != quotaEvaluator.mOffers.size()) {
524 auto& offer = quotaEvaluator.mOffers[ri];
525 // We were already offered some sharedMemory, so we
526 // do not consider the offer.
527 // FIXME: in principle this should account for memory
528 // available and being offered, however we
529 // want to get out of the woods for now.
530 if (offer.valid && offer.sharedMemory != 0) {
531 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
532 return;
533 }
534 ri++;
535 }
536 // Find the first empty offer and have 1GB of shared memory there
537 for (auto& offer : quotaEvaluator.mOffers) {
538 if (offer.valid == false) {
539 offer.cpu = 0;
540 offer.memory = 0;
541 offer.sharedMemory = 1000000000;
542 offer.valid = true;
543 offer.user = -1;
544 break;
545 }
546 }
548 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
549}
550
551struct DecongestionContext {
554};
555
556auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
557 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
558 auto& ref = task.user<DecongestionContext>().ref;
559
560 auto& decongestion = ref.get<DecongestionService>();
561 auto& proxy = ref.get<FairMQDeviceProxy>();
562 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
563 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
564 return;
565 }
566 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
567 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
568 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
569 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
570 // TODO: this we could cache in the proxy at the bind moment.
571 if (info.channelType != ChannelAccountingType::DPL) {
572 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
573 info.name.c_str());
574
575 continue;
576 }
577 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
578 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
579 info.name.c_str(), oldestTimeslice.timeslice.value);
580 }
581 }
582};
583
584// This is how we do the forwarding, i.e. we push
585// the inputs which are shared between this device and others
586// to the next one in the daisy chain.
587// FIXME: do it in a smarter way than O(N^2)
588static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
589 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
590 auto& proxy = registry.get<FairMQDeviceProxy>();
591
592 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
593 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
594 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
595 auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copy, consume);
596
597 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
598 if (forwardedParts[fi].Size() == 0) {
599 continue;
600 }
601 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
602 auto& parts = forwardedParts[fi];
603 if (info.policy == nullptr) {
604 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
605 continue;
606 }
607 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
608 info.policy->forward(parts, ChannelIndex{fi}, registry);
609 }
610
611 auto& asyncQueue = registry.get<AsyncQueue>();
612 auto& decongestion = registry.get<DecongestionService>();
613 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
614 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
615 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
616 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
617 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
618};
619
620extern volatile int region_read_global_dummy_variable;
622
624void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
625{
626 if (infos.empty() == false) {
627 std::vector<fair::mq::RegionInfo> toBeNotified;
628 toBeNotified.swap(infos); // avoid any MT issue.
629 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
630 for (auto const& info : toBeNotified) {
631 if (dummyRead) {
632 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
633 region_read_global_dummy_variable = ((int*)info.ptr)[i];
634 }
635 }
636 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
637 }
638 }
639}
640
641namespace
642{
644{
645 auto* state = (DeviceState*)handle->data;
647}
648} // namespace
649
650void DataProcessingDevice::initPollers()
651{
652 auto ref = ServiceRegistryRef{mServiceRegistry};
653 auto& deviceContext = ref.get<DeviceContext>();
654 auto& context = ref.get<DataProcessorContext>();
655 auto& spec = ref.get<DeviceSpec const>();
656 auto& state = ref.get<DeviceState>();
657 // We add a timer only in case a channel poller is not there.
658 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
659 for (auto& [channelName, channel] : GetChannels()) {
660 InputChannelInfo* channelInfo;
661 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
662 auto& channelSpec = spec.inputChannels[ci];
663 channelInfo = &state.inputChannelInfos[ci];
664 if (channelSpec.name != channelName) {
665 continue;
666 }
667 channelInfo->channel = &this->GetChannel(channelName, 0);
668 break;
669 }
670 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
671 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
672 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
673 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
674 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
675 continue;
676 }
677 // We only watch receiving sockets.
678 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
679 LOGP(detail, "{} is to send data. Not polling.", channelName);
680 continue;
681 }
682
683 if (channelName.rfind("from_", 0) != 0) {
684 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
685 continue;
686 }
687
688 // We assume there is always a ZeroMQ socket behind.
689 int zmq_fd = 0;
690 size_t zmq_fd_len = sizeof(zmq_fd);
691 // FIXME: I should probably save those somewhere... ;-)
692 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
693 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
694 if (zmq_fd == 0) {
695 LOG(error) << "Cannot get file descriptor for channel." << channelName;
696 continue;
697 }
698 LOGP(detail, "Polling socket for {}", channelName);
699 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
700 pCtx->name = strdup(channelName.c_str());
701 pCtx->loop = state.loop;
702 pCtx->device = this;
703 pCtx->state = &state;
704 pCtx->fd = zmq_fd;
705 assert(channelInfo != nullptr);
706 pCtx->channelInfo = channelInfo;
707 pCtx->socket = &channel[0].GetSocket();
708 pCtx->read = true;
709 poller->data = pCtx;
710 uv_poll_init(state.loop, poller, zmq_fd);
711 if (channelName.rfind("from_", 0) != 0) {
712 LOGP(detail, "{} is an out of band channel.", channelName);
713 state.activeOutOfBandPollers.push_back(poller);
714 } else {
715 channelInfo->pollerIndex = state.activeInputPollers.size();
716 state.activeInputPollers.push_back(poller);
717 }
718 }
719 // In case we do not have any input channel and we do not have
720 // any timers or signal watchers we still wake up whenever we can send data to downstream
721 // devices to allow for enumerations.
722 if (state.activeInputPollers.empty() &&
723 state.activeOutOfBandPollers.empty() &&
724 state.activeTimers.empty() &&
725 state.activeSignals.empty()) {
726 // FIXME: this is to make sure we do not reset the output timer
727 // for readout proxies or similar. In principle this should go once
728 // we move to OutOfBand InputSpec.
729 if (state.inputChannelInfos.empty()) {
730 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
731 deviceContext.exitTransitionTimeout = 0;
732 }
733 for (auto& [channelName, channel] : GetChannels()) {
734 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
735 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
736 continue;
737 }
738 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
739 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
740 continue;
741 }
742 // We assume there is always a ZeroMQ socket behind.
743 int zmq_fd = 0;
744 size_t zmq_fd_len = sizeof(zmq_fd);
745 // FIXME: I should probably save those somewhere... ;-)
746 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
747 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
748 if (zmq_fd == 0) {
749 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
750 continue;
751 }
752 LOG(detail) << "Polling socket for " << channel[0].GetName();
753 // FIXME: leak
754 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
755 pCtx->name = strdup(channelName.c_str());
756 pCtx->loop = state.loop;
757 pCtx->device = this;
758 pCtx->state = &state;
759 pCtx->fd = zmq_fd;
760 pCtx->read = false;
761 poller->data = pCtx;
762 uv_poll_init(state.loop, poller, zmq_fd);
763 state.activeOutputPollers.push_back(poller);
764 }
765 }
766 } else {
767 LOGP(detail, "This is a fake device so we exit after the first iteration.");
768 deviceContext.exitTransitionTimeout = 0;
769 // This is a fake device, so we can request to exit immediately
770 ServiceRegistryRef ref{mServiceRegistry};
771 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
772 // A two second timer to stop internal devices which do not want to
773 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
774 uv_timer_init(state.loop, timer);
775 timer->data = &state;
776 uv_update_time(state.loop);
777 uv_timer_start(timer, on_idle_timer, 2000, 2000);
778 state.activeTimers.push_back(timer);
779 }
780}
781
782void DataProcessingDevice::startPollers()
783{
784 auto ref = ServiceRegistryRef{mServiceRegistry};
785 auto& deviceContext = ref.get<DeviceContext>();
786 auto& state = ref.get<DeviceState>();
787
788 for (auto* poller : state.activeInputPollers) {
789 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
790 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
791 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
792 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
793 }
794 for (auto& poller : state.activeOutOfBandPollers) {
795 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
796 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
797 }
798 for (auto* poller : state.activeOutputPollers) {
799 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
800 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
801 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
802 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
803 }
804
805 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
806 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
807 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
808
809 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
810 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
811 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
812}
813
814void DataProcessingDevice::stopPollers()
815{
816 auto ref = ServiceRegistryRef{mServiceRegistry};
817 auto& deviceContext = ref.get<DeviceContext>();
818 auto& state = ref.get<DeviceState>();
819 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
820 for (auto* poller : state.activeInputPollers) {
821 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
822 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
823 uv_poll_stop(poller);
824 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
825 }
826 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
827 for (auto* poller : state.activeOutOfBandPollers) {
828 uv_poll_stop(poller);
829 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
830 }
831 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
832 for (auto* poller : state.activeOutputPollers) {
833 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
834 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
835 uv_poll_stop(poller);
836 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
837 }
838
839 uv_timer_stop(deviceContext.gracePeriodTimer);
840 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
841 free(deviceContext.gracePeriodTimer);
842 deviceContext.gracePeriodTimer = nullptr;
843
844 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
845 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
846 free(deviceContext.dataProcessingGracePeriodTimer);
847 deviceContext.dataProcessingGracePeriodTimer = nullptr;
848}
849
851{
852 auto ref = ServiceRegistryRef{mServiceRegistry};
853 auto& deviceContext = ref.get<DeviceContext>();
854 auto& context = ref.get<DataProcessorContext>();
855
856 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
857 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
858 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
859 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
860 auto& state = ref.get<DeviceState>();
861 int i = 0;
862 for (auto& di : distinct) {
863 auto& route = spec.inputs[di];
864 if (route.configurator.has_value() == false) {
865 i++;
866 continue;
867 }
868 ExpirationHandler handler{
869 .name = route.configurator->name,
870 .routeIndex = RouteIndex{i++},
871 .lifetime = route.matcher.lifetime,
872 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
873 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
874 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
875 context.expirationHandlers.emplace_back(std::move(handler));
876 }
877
878 if (state.awakeMainThread == nullptr) {
879 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
880 state.awakeMainThread->data = &state;
881 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
882 }
883
884 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
885 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
886 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
887
888 for (auto& channel : GetChannels()) {
889 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
890 &registry = mServiceRegistry,
891 &pendingRegionInfos = mPendingRegionInfos,
892 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
893 std::lock_guard<std::mutex> lock(regionInfoMutex);
894 LOG(detail) << ">>> Region info event" << info.event;
895 LOG(detail) << "id: " << info.id;
896 LOG(detail) << "ptr: " << info.ptr;
897 LOG(detail) << "size: " << info.size;
898 LOG(detail) << "flags: " << info.flags;
899 // Now we check for pending events with the mutex,
900 // so the lines below are atomic.
901 pendingRegionInfos.push_back(info);
902 context.expectedRegionCallbacks -= 1;
903 // We always want to handle these on the main loop,
904 // so we awake it.
905 ServiceRegistryRef ref{registry};
906 uv_async_send(ref.get<DeviceState>().awakeMainThread);
907 });
908 }
909
910 // Add a signal manager for SIGUSR1 so that we can force
911 // an event from the outside, making sure that the event loop can
912 // be unblocked (e.g. by a quitting DPL driver) even when there
913 // is no data pending to be processed.
914 if (deviceContext.sigusr1Handle == nullptr) {
915 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
916 deviceContext.sigusr1Handle->data = &mServiceRegistry;
917 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
918 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
919 }
920 // If there is any signal, we want to make sure they are active
921 for (auto& handle : state.activeSignals) {
922 handle->data = &state;
923 }
924 // When we start, we must make sure that we do listen to the signal
925 deviceContext.sigusr1Handle->data = &mServiceRegistry;
926
928 DataProcessingDevice::initPollers();
929
930 // Whenever we InitTask, we consider as if the previous iteration
931 // was successful, so that even if there is no timer or receiving
932 // channel, we can still start an enumeration.
933 DataProcessorContext* initialContext = nullptr;
934 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
935 if (!idle) {
936 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
937 }
938
939 // We should be ready to run here. Therefore we copy all the
940 // required parts in the DataProcessorContext. Eventually we should
941 // do so on a per thread basis, with fine grained locks.
942 // FIXME: this should not use ServiceRegistry::threadSalt, but
943 // more a ServiceRegistry::globalDataProcessorSalt(N) where
944 // N is the number of the multiplexed data processor.
945 // We will get there.
946 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
947
948 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
949
950 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
951 std::lock_guard<std::mutex> lock(mutex);
952 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
953 };
954 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
959 while (hasPendingEvents(deviceContext)) {
960 // Wait for the callback to signal its done, so that we do not busy wait.
961 uv_run(state.loop, UV_RUN_ONCE);
962 // Handle callbacks if any
963 {
964 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
965 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
966 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
967 }
968 }
969 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
970}
971
973{
974 context.isSink = false;
975 // If nothing is a sink, the rate limiting simply does not trigger.
976 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
977
978 auto ref = ServiceRegistryRef{mServiceRegistry};
979 auto& spec = ref.get<DeviceSpec const>();
980
981 // The policy is now allowed to state the default.
982 context.balancingInputs = spec.completionPolicy.balanceChannels;
983 // This is needed because the internal injected dummy sink should not
984 // try to balance inputs unless the rate limiting is requested.
985 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
986 context.balancingInputs = false;
987 }
988 if (enableRateLimiting) {
989 for (auto& spec : spec.outputs) {
990 if (spec.matcher.binding.value == "dpl-summary") {
991 context.isSink = true;
992 break;
993 }
994 }
995 }
996
997 context.registry = &mServiceRegistry;
1000 if (context.error != nullptr) {
1001 context.errorHandling = [&errorCallback = context.error,
1002 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1006 auto& err = error_from_ref(e);
1007 auto& context = ref.get<DataProcessorContext>();
1008 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1009 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1010 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1011 auto& stats = ref.get<DataProcessingStats>();
1013 ErrorContext errorContext{record, ref, e};
1014 errorCallback(errorContext);
1015 };
1016 } else {
1017 context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1018 auto& err = error_from_ref(e);
1022 auto& context = ref.get<DataProcessorContext>();
1023 auto& deviceContext = ref.get<DeviceContext>();
1024 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1025 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1026 auto& stats = ref.get<DataProcessingStats>();
1028 switch (deviceContext.processingPolicies.error) {
1030 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1031 throw e;
1032 default:
1033 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1034 break;
1035 }
1036 };
1037 }
1038
1039 auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
1042 bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1043 bool onlyConditions = true;
1044 bool overriddenEarlyForward = false;
1045 for (auto& forwarded : spec.forwards) {
1046 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1047 onlyConditions = false;
1048 }
1050 context.canForwardEarly = false;
1051 overriddenEarlyForward = true;
1052 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1053 break;
1054 }
1055 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1056 context.canForwardEarly = false;
1057 overriddenEarlyForward = true;
1058 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1059 break;
1060 }
1061 }
1062 if (!overriddenEarlyForward && onlyConditions) {
1063 context.canForwardEarly = true;
1064 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1065 }
1066 return canForwardEarly;
1067 };
1068 context.canForwardEarly = decideEarlyForward();
1069}
1070
1072{
1073 auto ref = ServiceRegistryRef{mServiceRegistry};
1074 auto& state = ref.get<DeviceState>();
1075
1076 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1077 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1078 state.quitRequested = false;
1080 state.allowedProcessing = DeviceState::Any;
1081 for (auto& info : state.inputChannelInfos) {
1082 if (info.state != InputChannelState::Pull) {
1083 info.state = InputChannelState::Running;
1084 }
1085 }
1086
1087 // Catch callbacks which fail before we start.
1088 // Notice that when running multiple dataprocessors
1089 // we should probably allow expendable ones to fail.
1090 try {
1091 auto& dpContext = ref.get<DataProcessorContext>();
1092 dpContext.preStartCallbacks(ref);
1093 for (size_t i = 0; i < mStreams.size(); ++i) {
1094 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1095 auto& context = streamRef.get<StreamContext>();
1096 context.preStartStreamCallbacks(streamRef);
1097 }
1098 } catch (std::exception& e) {
1099 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1100 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1101 throw;
1102 } catch (o2::framework::RuntimeErrorRef& e) {
1103 auto& err = error_from_ref(e);
1104 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1105 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1106 throw;
1107 } catch (...) {
1108 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1109 throw;
1110 }
1111
1112 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1113 startPollers();
1114
1115 // Raise to 1 when we are ready to start processing
1116 using o2::monitoring::Metric;
1117 using o2::monitoring::Monitoring;
1118 using o2::monitoring::tags::Key;
1119 using o2::monitoring::tags::Value;
1120
1121 auto& monitoring = ref.get<Monitoring>();
1122 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1123 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1124}
1125
1127{
1128 ServiceRegistryRef ref{mServiceRegistry};
1129 // Raise to 1 when we are ready to start processing
1130 using o2::monitoring::Metric;
1131 using o2::monitoring::Monitoring;
1132 using o2::monitoring::tags::Key;
1133 using o2::monitoring::tags::Value;
1134
1135 auto& monitoring = ref.get<Monitoring>();
1136 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1137
1138 stopPollers();
1139 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1140 auto& dpContext = ref.get<DataProcessorContext>();
1141 dpContext.postStopCallbacks(ref);
1142}
1143
1145{
1146 ServiceRegistryRef ref{mServiceRegistry};
1147 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1148}
1149
1151{
1152 ServiceRegistryRef ref{mServiceRegistry};
1153 auto& state = ref.get<DeviceState>();
1155 bool firstLoop = true;
1156 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1157 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1158
1159 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1160 if (dplEnableMultithreding) {
1161 setenv("UV_THREADPOOL_SIZE", "1", 1);
1162 }
1163
1164 while (state.transitionHandling != TransitionHandlingState::Expired) {
1165 if (state.nextFairMQState.empty() == false) {
1166 (void)this->ChangeState(state.nextFairMQState.back());
1167 state.nextFairMQState.pop_back();
1168 }
1169 // Notify on the main thread the new region callbacks, making sure
1170 // no callback is issued if there is something still processing.
1171 {
1172 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1173 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1174 }
1175 // This will block for the correct delay (or until we get data
1176 // on a socket). We also do not block on the first iteration
1177 // so that devices which do not have a timer can still start an
1178 // enumeration.
1179 {
1180 ServiceRegistryRef ref{mServiceRegistry};
1181 ref.get<DriverClient>().flushPending(mServiceRegistry);
1182 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1183 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1184 // In such case we will take care of it at next iteration.
1185 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1186
1187 auto shouldNotWait = (lastActive != nullptr &&
1188 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1190 if (firstLoop) {
1191 shouldNotWait = true;
1192 firstLoop = false;
1193 }
1194 if (lastActive != nullptr) {
1196 }
1197 if (NewStatePending()) {
1198 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1199 shouldNotWait = true;
1201 }
1203 // If we are Idle, we can then consider the transition to be expired.
1204 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1205 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1206 state.transitionHandling = TransitionHandlingState::Expired;
1207 }
1208 if (state.severityStack.empty() == false) {
1209 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1210 state.severityStack.pop_back();
1211 }
1212 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1213 // shouldNotWait |= info.readPolled;
1214 // }
1215 state.loopReason = DeviceState::NO_REASON;
1216 state.firedTimers.clear();
1217 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1218 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1219 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1220 }
1221 // Run the asynchronous queue just before sleeping again, so that:
1222 // - we can trigger further events from the queue
1223 // - we can guarantee this is the last thing we do in the loop (
1224 // assuming no one else is adding to the queue before this point).
1225 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1226 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1227 ServiceRegistryRef ref{registry};
1228 ref.get<AsyncQueue>();
1229 ref.get<DecongestionService>();
1230 ref.get<DataRelayer>();
1231 // Get the current timeslice for the slot.
1232 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1234 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1235 };
1236 auto& relayer = ref.get<DataRelayer>();
1237 relayer.prunePending(onDrop);
1238 auto& queue = ref.get<AsyncQueue>();
1239 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1240 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1241 if (shouldNotWait == false) {
1242 auto& dpContext = ref.get<DataProcessorContext>();
1243 dpContext.preLoopCallbacks(ref);
1244 }
1245 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1246 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1247 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1248 if ((state.loopReason & state.tracingFlags) != 0) {
1249 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1250 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1251 } else if (state.severityStack.empty() == false) {
1252 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1253 state.severityStack.pop_back();
1254 }
1255 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1256
1257 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1258 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1259 relayer.rescan();
1260 }
1261
1262 if (!state.pendingOffers.empty()) {
1263 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1264 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1265 }
1266 }
1267
1268 // Notify on the main thread the new region callbacks, making sure
1269 // no callback is issued if there is something still processing.
1270 // Notice that we still need to perform callbacks also after
1271 // the socket epolled, because otherwise we would end up serving
1272 // the callback after the first data arrives is the system is too
1273 // fast to transition from Init to Run.
1274 {
1275 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1276 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1277 }
1278
1279 assert(mStreams.size() == mHandles.size());
1281 TaskStreamRef streamRef{-1};
1282 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1283 auto& taskInfo = mStreams[ti];
1284 if (taskInfo.running) {
1285 continue;
1286 }
1287 // Stream 0 is for when we run in
1288 streamRef.index = ti;
1289 }
1290 using o2::monitoring::Metric;
1291 using o2::monitoring::Monitoring;
1292 using o2::monitoring::tags::Key;
1293 using o2::monitoring::tags::Value;
1294 // We have an empty stream, let's check if we have enough
1295 // resources for it to run something
1296 if (streamRef.index != -1) {
1297 // Synchronous execution of the callbacks. This will be moved in the
1298 // moved in the on_socket_polled once we have threading in place.
1299 uv_work_t& handle = mHandles[streamRef.index];
1300 TaskStreamInfo& stream = mStreams[streamRef.index];
1301 handle.data = &mStreams[streamRef.index];
1302
1303 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1304 ServiceRegistryRef ref{registry};
1305 auto& dpStats = ref.get<DataProcessingStats>();
1306 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1307 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1308 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
1309 dpStats.processCommandQueue();
1310 };
1311 auto ref = ServiceRegistryRef{mServiceRegistry};
1312
1313 // Deciding wether to run or not can be done by passing a request to
1314 // the evaluator. In this case, the request is always satisfied and
1315 // we run on whatever resource is available.
1316 auto& spec = ref.get<DeviceSpec const>();
1317 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1318
1319 struct SchedulingStats {
1320 std::atomic<size_t> lastScheduled = 0;
1321 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1322 std::atomic<size_t> numberOfUnscheduled = 0;
1323 std::atomic<size_t> numberOfScheduled = 0;
1324 };
1325 static SchedulingStats schedulingStats;
1326 O2_SIGNPOST_ID_GENERATE(sid, scheduling);
1327 if (enough) {
1328 stream.id = streamRef;
1329 stream.running = true;
1330 stream.registry = &mServiceRegistry;
1331 schedulingStats.lastScheduled = uv_now(state.loop);
1332 schedulingStats.numberOfScheduled++;
1333 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1334 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
1335 if (dplEnableMultithreding) [[unlikely]] {
1336 stream.task = &handle;
1337 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1338 } else {
1339 run_callback(&handle);
1340 run_completion(&handle, 0);
1341 }
1342 } else {
1343 if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1344 (uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) {
1345 O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1346 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1347 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1348 schedulingStats.lastScheduled.load());
1349 } else {
1350 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1351 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1352 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1353 schedulingStats.lastScheduled.load());
1354 }
1355 schedulingStats.numberOfUnscheduled++;
1356 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1357 auto ref = ServiceRegistryRef{mServiceRegistry};
1358 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1359 }
1360 }
1361 }
1362
1363 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
1364 auto& spec = ref.get<DeviceSpec const>();
1366 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1367 auto& info = state.inputChannelInfos[ci];
1368 info.parts.fParts.clear();
1369 }
1370 state.transitionHandling = TransitionHandlingState::NoTransition;
1371}
1372
1376{
1377 auto& context = ref.get<DataProcessorContext>();
1378 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1379 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1380
1381 {
1382 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1383 }
1384 // Whether or not we had something to do.
1385
1386 // Initialise the value for context.allDone. It will possibly be updated
1387 // below if any of the channels is not done.
1388 //
1389 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1390 // expect to receive an EndOfStream signal. Thus we do not wait for these
1391 // to be completed. In the case of data source devices, as they do not have
1392 // real data input channels, they have to signal EndOfStream themselves.
1393 auto& state = ref.get<DeviceState>();
1394 auto& spec = ref.get<DeviceSpec const>();
1395 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1396 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1397 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1398 if (info.channel) {
1399 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1400 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1401 } else {
1402 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1403 }
1404 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1405 });
1406 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1407 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1410 static std::vector<int> pollOrder;
1411 pollOrder.resize(state.inputChannelInfos.size());
1412 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1413 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1414 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1415 });
1416
1417 // Nothing to poll...
1418 if (pollOrder.empty()) {
1419 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1420 return;
1421 }
1422 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1423 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1424 auto delta = currentNewest.value - currentOldest.value;
1425 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1426 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1427 auto& infos = state.inputChannelInfos;
1428
1429 if (context.balancingInputs) {
1430 static int pipelineLength = DefaultsHelpers::pipelineLength();
1431 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1432 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1433 return infos[a].oldestForChannel.value > limitNew;
1434 });
1435 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1436 const auto& channelInfo = state.inputChannelInfos[*it];
1437 if (channelInfo.pollerIndex != -1) {
1438 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1439 auto& pollerContext = *(PollerContext*)(poller->data);
1440 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1441 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1442 bool shouldBeRunning = it < newEnd;
1443 if (running != shouldBeRunning) {
1444 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1445 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1446 }
1447 }
1448 }
1449 }
1450 pollOrder.erase(newEnd, pollOrder.end());
1451 }
1452 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1453
1454 for (auto sci : pollOrder) {
1455 auto& info = state.inputChannelInfos[sci];
1456 auto& channelSpec = spec.inputChannels[sci];
1457 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1458 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1459
1460 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1461 context.allDone = false;
1462 }
1463 if (info.state != InputChannelState::Running) {
1464 // Remember to flush data if we are not running
1465 // and there is some message pending.
1466 if (info.parts.Size()) {
1468 }
1469 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1470 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1471 continue;
1472 }
1473 if (info.channel == nullptr) {
1474 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1475 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1476 continue;
1477 }
1478 // Only poll DPL channels for now.
1480 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1481 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1482 continue;
1483 }
1484 auto& socket = info.channel->GetSocket();
1485 // If we have pending events from a previous iteration,
1486 // we do receive in any case.
1487 // Otherwise we check if there is any pending event and skip
1488 // this channel in case there is none.
1489 if (info.hasPendingEvents == 0) {
1490 socket.Events(&info.hasPendingEvents);
1491 // If we do not read, we can continue.
1492 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1493 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1494 continue;
1495 }
1496 }
1497 // We can reset this, because it means we have seen at least 1
1498 // message after the UV_READABLE was raised.
1499 info.readPolled = false;
1500 // Notice that there seems to be a difference between the documentation
1501 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1502 // is raised does not mean that a message is immediately available to
1503 // read, just that it will be available soon, so the receive can
1504 // still return -2. To avoid this we keep receiving on the socket until
1505 // we get a message. In order not to overflow the DPL queue we process
1506 // one message at the time and we keep track of wether there were more
1507 // to process.
1508 bool newMessages = false;
1509 while (true) {
1510 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1511 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1512 if (info.parts.Size() < 64) {
1513 fair::mq::Parts parts;
1514 info.channel->Receive(parts, 0);
1515 if (parts.Size()) {
1516 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1517 }
1518 for (auto&& part : parts) {
1519 info.parts.fParts.emplace_back(std::move(part));
1520 }
1521 newMessages |= true;
1522 }
1523
1524 if (info.parts.Size() >= 0) {
1526 // Receiving data counts as activity now, so that
1527 // We can make sure we process all the pending
1528 // messages without hanging on the uv_run.
1529 break;
1530 }
1531 }
1532 // We check once again for pending events, keeping track if this was the
1533 // case so that we can immediately repeat this loop and avoid remaining
1534 // stuck in uv_run. This is because we will not get notified on the socket
1535 // if more events are pending due to zeromq level triggered approach.
1536 socket.Events(&info.hasPendingEvents);
1537 if (info.hasPendingEvents) {
1538 info.readPolled = false;
1539 // In case there were messages, we consider it as activity
1540 if (newMessages) {
1541 state.lastActiveDataProcessor.store(&context);
1542 }
1543 }
1544 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1545 channelSpec.name.c_str(), info.id.value);
1546 }
1547}
1548
1550{
1551 auto& context = ref.get<DataProcessorContext>();
1552 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1553 auto& state = ref.get<DeviceState>();
1554 auto& spec = ref.get<DeviceSpec const>();
1555
1556 if (state.streaming == StreamingState::Idle) {
1557 return;
1558 }
1559
1560 context.completed.clear();
1561 context.completed.reserve(16);
1562 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1563 state.lastActiveDataProcessor.store(&context);
1564 }
1565 DanglingContext danglingContext{*context.registry};
1566
1567 context.preDanglingCallbacks(danglingContext);
1568 if (state.lastActiveDataProcessor.load() == nullptr) {
1569 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1570 }
1571 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1572 if (activity.expiredSlots > 0) {
1573 state.lastActiveDataProcessor = &context;
1574 }
1575
1576 context.completed.clear();
1577 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1578 state.lastActiveDataProcessor = &context;
1579 }
1580
1581 context.postDanglingCallbacks(danglingContext);
1582
1583 // If we got notified that all the sources are done, we call the EndOfStream
1584 // callback and return false. Notice that what happens next is actually
1585 // dependent on the callback, not something which is controlled by the
1586 // framework itself.
1587 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1589 state.lastActiveDataProcessor = &context;
1590 }
1591
1592 if (state.streaming == StreamingState::EndOfStreaming) {
1593 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1594 // We keep processing data until we are Idle.
1595 // FIXME: not sure this is the correct way to drain the queues, but
1596 // I guess we will see.
1599 auto& relayer = ref.get<DataRelayer>();
1600
1601 bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
1602
1603 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1604 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1605 }
1606
1607 auto& timingInfo = ref.get<TimingInfo>();
1608 // We should keep the data generated at end of stream only for those
1609 // which are not sources.
1610 timingInfo.keepAtEndOfStream = shouldProcess;
1611 // Fill timinginfo with some reasonable values for data sent with endOfStream
1612 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1613 timingInfo.tfCounter = -1;
1614 timingInfo.firstTForbit = -1;
1615 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1616 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1617 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1618
1619 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1620
1621 context.preEOSCallbacks(eosContext);
1622 auto& streamContext = ref.get<StreamContext>();
1623 streamContext.preEOSCallbacks(eosContext);
1624 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1625 streamContext.postEOSCallbacks(eosContext);
1626 context.postEOSCallbacks(eosContext);
1627
1628 for (auto& channel : spec.outputChannels) {
1629 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1631 }
1632 // This is needed because the transport is deleted before the device.
1633 relayer.clear();
1635 // In case we should process, note the data processor responsible for it
1636 if (shouldProcess) {
1637 state.lastActiveDataProcessor = &context;
1638 }
1639 // On end of stream we shut down all output pollers.
1640 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1641 for (auto& poller : state.activeOutputPollers) {
1642 uv_poll_stop(poller);
1643 }
1644 return;
1645 }
1646
1647 if (state.streaming == StreamingState::Idle) {
1648 // On end of stream we shut down all output pollers.
1649 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1650 for (auto& poller : state.activeOutputPollers) {
1651 uv_poll_stop(poller);
1652 }
1653 }
1654
1655 return;
1656}
1657
1659{
1660 ServiceRegistryRef ref{mServiceRegistry};
1661 ref.get<DataRelayer>().clear();
1662 auto& deviceContext = ref.get<DeviceContext>();
1663 // If the signal handler is there, we should
1664 // hide the registry from it, so that we do not
1665 // end up calling the signal handler on something
1666 // which might not be there anymore.
1667 if (deviceContext.sigusr1Handle) {
1668 deviceContext.sigusr1Handle->data = nullptr;
1669 }
1670 // Makes sure we do not have a working context on
1671 // shutdown.
1672 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1673 handle->data = nullptr;
1674 }
1675}
1676
1679 {
1680 }
1681};
1682
1688{
1689 using InputInfo = DataRelayer::InputInfo;
1691
1692 auto& context = ref.get<DataProcessorContext>();
1693 // This is the same id as the upper level function, so we get the events
1694 // associated with the same interval. We will simply use "handle_data" as
1695 // the category.
1696 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1697
1698 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1699 // and we do a few stats. We bind parts as a lambda captured variable, rather
1700 // than an input, because we do not want the outer loop actually be exposed
1701 // to the implementation details of the messaging layer.
1702 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1703 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1704 auto ref = ServiceRegistryRef{*context.registry};
1705 auto& stats = ref.get<DataProcessingStats>();
1706 auto& state = ref.get<DeviceState>();
1707 auto& parts = info.parts;
1708 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1709
1710 std::vector<InputInfo> results;
1711 // we can reserve the upper limit
1712 results.reserve(parts.Size() / 2);
1713 size_t nTotalPayloads = 0;
1714
1715 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1716 results.emplace_back(position, length, type, index);
1717 if (type != InputType::Invalid && length > 1) {
1718 nTotalPayloads += length - 1;
1719 }
1720 };
1721
1722 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1723 auto* headerData = parts.At(pi)->GetData();
1724 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1725 auto dh = o2::header::get<DataHeader*>(headerData);
1726 if (sih) {
1727 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1728 info.state = sih->state;
1729 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1730 state.lastActiveDataProcessor = &context;
1731 if (dh) {
1732 LOGP(error, "Found data attached to a SourceInfoHeader");
1733 }
1734 continue;
1735 }
1736 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1737 if (dih) {
1738 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1739 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1740 state.lastActiveDataProcessor = &context;
1741 if (dh) {
1742 LOGP(error, "Found data attached to a DomainInfoHeader");
1743 }
1744 continue;
1745 }
1746 if (!dh) {
1747 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1748 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1749 continue;
1750 }
1751 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1752 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1753 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1754 continue;
1755 }
1756 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1757 // We only deal with the tracking of parts if the log is enabled.
1758 // This is because in principle we should track the size of each of
1759 // the parts and sum it up. Not for now.
1760 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1761 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1762 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1763 if (!dph) {
1764 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1765 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1766 continue;
1767 }
1768 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1769 // this is indicating a sequence of payloads following the header
1770 // FIXME: we will probably also set the DataHeader version
1771 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1772 pi += dh->splitPayloadParts - 1;
1773 } else {
1774 // We can set the type for the next splitPayloadParts
1775 // because we are guaranteed they are all the same.
1776 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1777 // pair.
1778 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1779 if (finalSplitPayloadIndex > parts.Size()) {
1780 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1781 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1782 continue;
1783 }
1784 insertInputInfo(pi, 2, InputType::Data, info.id);
1785 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1786 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1787 }
1788 }
1789 }
1790 if (results.size() + nTotalPayloads != parts.Size()) {
1791 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1792 return std::nullopt;
1793 }
1794 return results;
1795 };
1796
1797 auto reportError = [ref](const char* message) {
1798 auto& stats = ref.get<DataProcessingStats>();
1800 };
1801
1802 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1803 auto& relayer = ref.get<DataRelayer>();
1804 auto& state = ref.get<DeviceState>();
1805 static WaitBackpressurePolicy policy;
1806 auto& parts = info.parts;
1807 // We relay execution to make sure we have a complete set of parts
1808 // available.
1809 bool hasBackpressure = false;
1810 size_t minBackpressureTimeslice = -1;
1811 bool hasData = false;
1812 size_t oldestPossibleTimeslice = -1;
1813 static std::vector<int> ordering;
1814 // Same as inputInfos but with iota.
1815 ordering.resize(inputInfos.size());
1816 std::iota(ordering.begin(), ordering.end(), 0);
1817 // stable sort orderings by type and position
1818 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
1819 auto const& ai = inputInfos[a];
1820 auto const& bi = inputInfos[b];
1821 if (ai.type != bi.type) {
1822 return ai.type < bi.type;
1823 }
1824 return ai.position < bi.position;
1825 });
1826 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
1827 auto const& input = inputInfos[ordering[ii]];
1828 switch (input.type) {
1829 case InputType::Data: {
1830 hasData = true;
1831 auto headerIndex = input.position;
1832 auto nMessages = 0;
1833 auto nPayloadsPerHeader = 0;
1834 if (input.size > 2) {
1835 // header and multiple payload sequence
1836 nMessages = input.size;
1837 nPayloadsPerHeader = nMessages - 1;
1838 } else {
1839 // multiple header-payload pairs
1840 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
1841 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
1842 nPayloadsPerHeader = 1;
1843 ii += (nMessages / 2) - 1;
1844 }
1845 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1846 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
1847 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
1848 slot.index, oldestOutputInfo.timeslice.value);
1849 ref.get<AsyncQueue>();
1850 ref.get<DecongestionService>();
1851 ref.get<DataRelayer>();
1852 // Get the current timeslice for the slot.
1853 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1855 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
1856 };
1857 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
1858 &parts.At(headerIndex),
1859 input,
1860 nMessages,
1861 nPayloadsPerHeader,
1862 onDrop);
1863 switch (relayed.type) {
1865 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
1866 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
1867 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1868 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
1869 info.backpressureNotified = true;
1870 info.normalOpsNotified = false;
1871 }
1872 policy.backpressure(info);
1873 hasBackpressure = true;
1874 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
1875 break;
1879 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
1880 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
1881 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1882 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
1883 info.normalOpsNotified = true;
1884 info.backpressureNotified = false;
1885 }
1886 break;
1887 }
1888 } break;
1889 case InputType::SourceInfo: {
1890 LOGP(detail, "Received SourceInfo");
1891 auto& context = ref.get<DataProcessorContext>();
1892 state.lastActiveDataProcessor = &context;
1893 auto headerIndex = input.position;
1894 auto payloadIndex = input.position + 1;
1895 assert(payloadIndex < parts.Size());
1896 // FIXME: the message with the end of stream cannot contain
1897 // split parts.
1898 parts.At(headerIndex).reset(nullptr);
1899 parts.At(payloadIndex).reset(nullptr);
1900 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
1901 // parts.At(headerIndex + 1 + i).reset(nullptr);
1902 // }
1903 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
1904
1905 } break;
1906 case InputType::DomainInfo: {
1909 auto& context = ref.get<DataProcessorContext>();
1910 state.lastActiveDataProcessor = &context;
1911 auto headerIndex = input.position;
1912 auto payloadIndex = input.position + 1;
1913 assert(payloadIndex < parts.Size());
1914 // FIXME: the message with the end of stream cannot contain
1915 // split parts.
1916
1917 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
1918 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
1919 break;
1920 }
1921 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
1922 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
1923 parts.At(headerIndex).reset(nullptr);
1924 parts.At(payloadIndex).reset(nullptr);
1925 } break;
1926 case InputType::Invalid: {
1927 reportError("Invalid part found.");
1928 } break;
1929 }
1930 }
1933 if (oldestPossibleTimeslice != (size_t)-1) {
1934 info.oldestForChannel = {oldestPossibleTimeslice};
1935 auto& context = ref.get<DataProcessorContext>();
1936 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
1937 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
1938 state.lastActiveDataProcessor = &context;
1939 }
1940 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
1941 parts.fParts.erase(it, parts.end());
1942 if (parts.fParts.size()) {
1943 LOG(debug) << parts.fParts.size() << " messages backpressured";
1944 }
1945 };
1946
1947 // Second part. This is the actual outer loop we want to obtain, with
1948 // implementation details which can be read. Notice how most of the state
1949 // is actually hidden. For example we do not expose what "input" is. This
1950 // will allow us to keep the same toplevel logic even if the actual meaning
1951 // of input is changed (for example we might move away from multipart
1952 // messages). Notice also that we need to act diffently depending on the
1953 // actual CompletionOp we want to perform. In particular forwarding inputs
1954 // also gets rid of them from the cache.
1955 auto inputTypes = getInputTypes();
1956 if (bool(inputTypes) == false) {
1957 reportError("Parts should come in couples. Dropping it.");
1958 return;
1959 }
1960 handleValidMessages(*inputTypes);
1961 return;
1962}
1963
1964namespace
1965{
1966struct InputLatency {
1967 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
1968 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
1969};
1970
1971auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
1972{
1973 InputLatency result;
1974
1975 for (auto& item : record) {
1976 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
1977 if (header == nullptr) {
1978 continue;
1979 }
1980 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
1981 if (partLatency < 0) {
1982 partLatency = 0;
1983 }
1984 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
1985 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
1986 }
1987 return result;
1988};
1989
1990auto calculateTotalInputRecordSize(InputRecord const& record) -> int
1991{
1992 size_t totalInputSize = 0;
1993 for (auto& item : record) {
1994 auto* header = o2::header::get<DataHeader*>(item.header);
1995 if (header == nullptr) {
1996 continue;
1997 }
1998 totalInputSize += header->payloadSize;
1999 }
2000 return totalInputSize;
2001};
2002
2003template <typename T>
2004void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2005{
2006 T prev_value = maximum_value;
2007 while (prev_value < value &&
2008 !maximum_value.compare_exchange_weak(prev_value, value)) {
2009 }
2010}
2011} // namespace
2012
2013bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2014{
2015 auto& context = ref.get<DataProcessorContext>();
2016 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2017 // This is the actual hidden state for the outer loop. In case we decide we
2018 // want to support multithreaded dispatching of operations, I can simply
2019 // move these to some thread local store and the rest of the lambdas
2020 // should work just fine.
2021 std::vector<MessageSet> currentSetOfInputs;
2022
2023 //
2024 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2025 auto& relayer = ref.get<DataRelayer>();
2026 if (consume) {
2027 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2028 } else {
2029 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2030 }
2031 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2032 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2033 const char* headerptr = nullptr;
2034 const char* payloadptr = nullptr;
2035 size_t payloadSize = 0;
2036 // - each input can have multiple parts
2037 // - "part" denotes a sequence of messages belonging together, the first message of the
2038 // sequence is the header message
2039 // - each part has one or more payload messages
2040 // - InputRecord provides all payloads as header-payload pair
2041 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2042 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2043 headerptr = static_cast<char const*>(headerMsg->GetData());
2044 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2045 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2046 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2047 }
2048 return DataRef{};
2049 };
2050 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2051 return currentSetOfInputs[i].getNumberOfPairs();
2052 };
2053 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2054 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2055 return header.GetRefCount();
2056 };
2057 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2058 };
2059
2060 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2061 auto& relayer = ref.get<DataRelayer>();
2063 };
2064
2065 // I need a preparation step which gets the current timeslice id and
2066 // propagates it to the various contextes (i.e. the actual entities which
2067 // create messages) because the messages need to have the timeslice id into
2068 // it.
2069 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2070 auto& relayer = ref.get<DataRelayer>();
2071 auto& timingInfo = ref.get<TimingInfo>();
2072 auto timeslice = relayer.getTimesliceForSlot(i);
2073
2074 timingInfo.timeslice = timeslice.value;
2075 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2076 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2077 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2078 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2079 };
2080 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2081 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2082 auto& relayer = ref.get<DataRelayer>();
2083 auto& timingInfo = ref.get<TimingInfo>();
2084 auto timeslice = relayer.getTimesliceForSlot(i);
2085 // We report wether or not this timing info refers to a new Run.
2086 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2087 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2088 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2089 // FIXME: for now there is only one stream, however we
2090 // should calculate this correctly once we finally get the
2091 // the StreamContext in.
2092 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2093 };
2094
2095 // When processing them, timers will have to be cleaned up
2096 // to avoid double counting them.
2097 // This was actually the easiest solution we could find for
2098 // O2-646.
2099 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2100 assert(record.size() == currentSetOfInputs.size());
2101 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2102 // assuming that for timer inputs we do have exactly one PartRef object
2103 // in the MessageSet, multiple PartRef Objects are only possible for either
2104 // split payload messages of wildcard matchers, both for data inputs
2105 DataRef input = record.getByPos(ii);
2106 if (input.spec->lifetime != Lifetime::Timer) {
2107 continue;
2108 }
2109 if (input.header == nullptr) {
2110 continue;
2111 }
2112 // This will hopefully delete the message.
2113 currentSetOfInputs[ii].clear();
2114 }
2115 };
2116
2117 // Function to cleanup record. For the moment we
2118 // simply use it to keep track of input messages
2119 // which are not needed, to display them in the GUI.
2120 auto cleanupRecord = [](InputRecord& record) {
2121 if (O2_LOG_ENABLED(parts) == false) {
2122 return;
2123 }
2124 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2125 DataRef input = record.getByPos(pi);
2126 if (input.header == nullptr) {
2127 continue;
2128 }
2129 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2130 if (sih) {
2131 continue;
2132 }
2133
2134 auto dh = o2::header::get<DataHeader*>(input.header);
2135 if (!dh) {
2136 continue;
2137 }
2138 // We use the address of the first header of a split payload
2139 // to identify the interval.
2140 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2141 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2142
2143 // No split parts, we simply skip the payload
2144 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2145 // this is indicating a sequence of payloads following the header
2146 // FIXME: we will probably also set the DataHeader version
2147 pi += dh->splitPayloadParts - 1;
2148 } else {
2149 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2150 }
2151 }
2152 };
2153
2154 ref.get<DataRelayer>().getReadyToProcess(completed);
2155 if (completed.empty() == true) {
2156 LOGP(debug, "No computations available for dispatching.");
2157 return false;
2158 }
2159
2160 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2161 auto& stats = ref.get<DataProcessingStats>();
2162 auto& states = ref.get<DataProcessingStates>();
2163 std::atomic_thread_fence(std::memory_order_release);
2164 char relayerSlotState[1024];
2165 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2166 char* buffer = relayerSlotState + written;
2167 for (size_t ai = 0; ai != record.size(); ai++) {
2168 buffer[ai] = record.isValid(ai) ? '3' : '0';
2169 }
2170 buffer[record.size()] = 0;
2171 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2172 .size = (int)(record.size() + buffer - relayerSlotState),
2173 .data = relayerSlotState});
2174 uint64_t tEnd = uv_hrtime();
2175 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2176 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2178 // Sum up the total wall time, in milliseconds.
2180 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2182 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2183 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2184 auto latency = calculateInputRecordLatency(record, tStartMilli);
2185 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2186 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2187 static int count = 0;
2189 count++;
2190 };
2191
2192 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2193 auto& states = ref.get<DataProcessingStates>();
2194 std::atomic_thread_fence(std::memory_order_release);
2195 char relayerSlotState[1024];
2196 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2197 char* buffer = strchr(relayerSlotState, ' ') + 1;
2198 for (size_t ai = 0; ai != record.size(); ai++) {
2199 buffer[ai] = record.isValid(ai) ? '2' : '0';
2200 }
2201 buffer[record.size()] = 0;
2202 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2203 };
2204
2205 // This is the main dispatching loop
2206 auto& state = ref.get<DeviceState>();
2207 auto& spec = ref.get<DeviceSpec const>();
2208
2209 auto& dpContext = ref.get<DataProcessorContext>();
2210 auto& streamContext = ref.get<StreamContext>();
2211 O2_SIGNPOST_ID_GENERATE(sid, device);
2212 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2213
2214 auto& stats = ref.get<DataProcessingStats>();
2215 auto& relayer = ref.get<DataRelayer>();
2216 using namespace o2::framework;
2217 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2218 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2219 switch (spec.completionPolicy.order) {
2221 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2222 break;
2224 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2225 break;
2227 default:
2228 break;
2229 }
2230
2231 for (auto action : completed) {
2232 O2_SIGNPOST_ID_GENERATE(aid, device);
2233 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2235 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2236 continue;
2237 }
2238
2239 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2241 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2245 updateRunInformation(TimesliceSlot{action.slot});
2246 }
2247 InputSpan span = getInputSpan(action.slot, shouldConsume);
2248 auto& spec = ref.get<DeviceSpec const>();
2249 InputRecord record{spec.inputs,
2250 span,
2251 *context.registry};
2252 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2253 {
2254 // Notice this should be thread safe and reentrant
2255 // as it is called from many threads.
2256 streamContext.preProcessingCallbacks(processContext);
2257 dpContext.preProcessingCallbacks(processContext);
2258 }
2260 context.postDispatchingCallbacks(processContext);
2261 if (spec.forwards.empty() == false) {
2262 auto& timesliceIndex = ref.get<TimesliceIndex>();
2263 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2264 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2265 continue;
2266 }
2267 }
2268 // If there is no optional inputs we canForwardEarly
2269 // the messages to that parallel processing can happen.
2270 // In this case we pass true to indicate that we want to
2271 // copy the messages to the subsequent data processor.
2272 bool hasForwards = spec.forwards.empty() == false;
2274
2275 if (context.canForwardEarly && hasForwards && consumeSomething) {
2276 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2277 auto& timesliceIndex = ref.get<TimesliceIndex>();
2278 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2279 }
2280 markInputsAsDone(action.slot);
2281
2282 uint64_t tStart = uv_hrtime();
2283 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2284 preUpdateStats(action, record, tStart);
2285
2286 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2287
2288 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2289 auto& state = ref.get<DeviceState>();
2290 auto& spec = ref.get<DeviceSpec const>();
2291 auto& streamContext = ref.get<StreamContext>();
2292 auto& dpContext = ref.get<DataProcessorContext>();
2293 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2294 switch (action.op) {
2299 return true;
2300 break;
2301 default:
2302 return false;
2303 }
2304 };
2305 if (state.quitRequested == false) {
2306 {
2307 // Callbacks from services
2308 dpContext.preProcessingCallbacks(processContext);
2309 streamContext.preProcessingCallbacks(processContext);
2310 dpContext.preProcessingCallbacks(processContext);
2311 // Callbacks from users
2312 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2313 }
2314 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2315 if (context.statefulProcess && shouldProcess(action)) {
2316 // This way, usercode can use the the same processing context to identify
2317 // its signposts and we can map user code to device iterations.
2318 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2319 (context.statefulProcess)(processContext);
2320 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2321 } else if (context.statelessProcess && shouldProcess(action)) {
2322 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2323 (context.statelessProcess)(processContext);
2324 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2325 } else if (context.statelessProcess || context.statefulProcess) {
2326 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2327 } else {
2328 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2330 }
2331 if (shouldProcess(action)) {
2332 auto& timingInfo = ref.get<TimingInfo>();
2333 if (timingInfo.globalRunNumberChanged) {
2334 context.lastRunNumberProcessed = timingInfo.runNumber;
2335 }
2336 }
2337
2338 // Notify the sink we just consumed some timeframe data
2339 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2340 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2341 auto& allocator = ref.get<DataAllocator>();
2342 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2343 }
2344
2345 // Extra callback which allows a service to add extra outputs.
2346 // This is needed e.g. to ensure that injected CCDB outputs are added
2347 // before an end of stream.
2348 {
2349 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2350 dpContext.finaliseOutputsCallbacks(processContext);
2351 streamContext.finaliseOutputsCallbacks(processContext);
2352 }
2353
2354 {
2355 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2356 dpContext.postProcessingCallbacks(processContext);
2357 streamContext.postProcessingCallbacks(processContext);
2358 }
2359 }
2360 };
2361
2362 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2363 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2364 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2365 }
2366 if (noCatch) {
2367 try {
2368 runNoCatch(action);
2369 } catch (o2::framework::RuntimeErrorRef e) {
2370 (context.errorHandling)(e, record);
2371 }
2372 } else {
2373 try {
2374 runNoCatch(action);
2375 } catch (std::exception& ex) {
2379 auto e = runtime_error(ex.what());
2380 (context.errorHandling)(e, record);
2381 } catch (o2::framework::RuntimeErrorRef e) {
2382 (context.errorHandling)(e, record);
2383 }
2384 }
2385 if (state.severityStack.empty() == false) {
2386 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2387 state.severityStack.pop_back();
2388 }
2389
2390 postUpdateStats(action, record, tStart, tStartMilli);
2391 // We forward inputs only when we consume them. If we simply Process them,
2392 // we keep them for next message arriving.
2394 cleanupRecord(record);
2395 context.postDispatchingCallbacks(processContext);
2396 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2397 }
2398 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2399 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2400 auto& timesliceIndex = ref.get<TimesliceIndex>();
2401 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2402 }
2403 context.postForwardingCallbacks(processContext);
2405 cleanTimers(action.slot, record);
2406 }
2407 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2408 }
2409 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2410
2411 // We now broadcast the end of stream if it was requested
2412 if (state.streaming == StreamingState::EndOfStreaming) {
2413 LOGP(detail, "Broadcasting end of stream");
2414 for (auto& channel : spec.outputChannels) {
2416 }
2418 }
2419
2420 return true;
2421}
2422
2424{
2425 LOG(error) << msg;
2426 ServiceRegistryRef ref{mServiceRegistry};
2427 auto& stats = ref.get<DataProcessingStats>();
2429}
2430
2431std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2432{
2433
2434 if (registry.active<ConfigurationInterface>()) {
2435 auto& cfg = registry.get<ConfigurationInterface>();
2436 try {
2437 cfg.getRecursive(name);
2438 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2439 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2440 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2441 configStore->preload();
2442 configStore->activate();
2443 return configStore;
2444 } catch (...) {
2445 // No overrides...
2446 }
2447 }
2448 return {nullptr};
2449}
2450
2451} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
std::ostringstream debug
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:600
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< MessageSet > &currentSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:619
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153