Project
Loading...
Searching...
No Matches
DataProcessingDevice.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <atomic>
33#include "Framework/InputSpan.h"
34#if defined(__APPLE__) || defined(NDEBUG)
35#define O2_SIGNPOST_IMPLEMENTATION
36#endif
37#include "Framework/Signpost.h"
50
51#include "DecongestionService.h"
53#include "DataRelayerHelpers.h"
54#include "Headers/DataHeader.h"
56
57#include <Framework/Tracing.h>
58
59#include <fairmq/Parts.h>
60#include <fairmq/Socket.h>
61#include <fairmq/ProgOptions.h>
62#if __has_include(<fairmq/shmem/Message.h>)
63#include <fairmq/shmem/Message.h>
64#endif
65#include <Configuration/ConfigurationInterface.h>
66#include <Configuration/ConfigurationFactory.h>
67#include <Monitoring/Monitoring.h>
68#include <TMessage.h>
69#include <TClonesArray.h>
70
71#include <fmt/ostream.h>
72#include <algorithm>
73#include <vector>
74#include <numeric>
75#include <memory>
76#include <uv.h>
77#include <execinfo.h>
78#include <sstream>
79#include <boost/property_tree/json_parser.hpp>
80
81// Formatter to avoid having to rewrite the ostream operator for the enum
82namespace fmt
83{
84template <>
87} // namespace fmt
88
89// A log to use for general device logging
91// A log to use for general device logging
93// Special log to keep track of the lifetime of the parts
95// Stream which keeps track of the calibration lifetime logic
97// Special log to track the async queue behavior
99// Special log to track the forwarding requests
101// Special log to track CCDB related requests
103// Special log to track task scheduling
105
106using namespace o2::framework;
107using ConfigurationInterface = o2::configuration::ConfigurationInterface;
109
110constexpr int DEFAULT_MAX_CHANNEL_AHEAD = 128;
111
112namespace o2::framework
113{
114
115template <>
119
123{
124 auto* state = (DeviceState*)handle->data;
125 state->loopReason |= DeviceState::TIMER_EXPIRED;
126}
127
129{
130 auto* state = (DeviceState*)s->data;
132}
133
134DeviceSpec const& getRunningDevice(RunningDeviceRef const& running, ServiceRegistryRef const& services)
135{
136 auto& devices = services.get<o2::framework::RunningWorkflowInfo const>().devices;
137 return devices[running.index];
138}
139
145
147 : mRunningDevice{running},
148 mConfigRegistry{nullptr},
149 mServiceRegistry{registry}
150{
151 GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
152 if (key == "cleanup") {
154 auto& deviceState = ref.get<DeviceState>();
155 int64_t cleanupCount = deviceState.cleanupCount.load();
156 int64_t newCleanupCount = std::stoll(value);
157 if (newCleanupCount <= cleanupCount) {
158 return;
159 }
160 deviceState.cleanupCount.store(newCleanupCount);
161 for (auto& info : deviceState.inputChannelInfos) {
162 fair::mq::Parts parts;
163 while (info.channel->Receive(parts, 0)) {
164 LOGP(debug, "Dropping {} parts", parts.Size());
165 if (parts.Size() == 0) {
166 break;
167 }
168 }
169 }
170 }
171 });
172
173 std::function<void(const fair::mq::State)> stateWatcher = [this, &registry = mServiceRegistry](const fair::mq::State state) -> void {
175 auto& deviceState = ref.get<DeviceState>();
176 auto& control = ref.get<ControlService>();
177 auto& callbacks = ref.get<CallbackService>();
178 control.notifyDeviceState(fair::mq::GetStateName(state));
180
181 if (deviceState.nextFairMQState.empty() == false) {
182 auto state = deviceState.nextFairMQState.back();
183 (void)this->ChangeState(state);
184 deviceState.nextFairMQState.pop_back();
185 }
186 };
187
188 // 99 is to execute DPL callbacks last
189 this->SubscribeToStateChange("99-dpl", stateWatcher);
190
191 // One task for now.
192 mStreams.resize(1);
193 mHandles.resize(1);
194
195 ServiceRegistryRef ref{mServiceRegistry};
196
197 mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
198 auto& state = ref.get<DeviceState>();
199 assert(state.loop);
200 int res = uv_async_init(state.loop, mAwakeHandle, on_communication_requested);
201 mAwakeHandle->data = &state;
202 if (res < 0) {
203 LOG(error) << "Unable to initialise subscription";
204 }
205
207 SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle](fair::mq::Transition t) {
208 int res = uv_async_send(wakeHandle);
209 if (res < 0) {
210 LOG(error) << "Unable to notify subscription";
211 }
212 LOG(debug) << "State transition requested";
213 });
214}
215
216// Callback to execute the processing. Notice how the data is
217// is a vector of DataProcessorContext so that we can index the correct
218// one with the thread id. For the moment we simply use the first one.
219void run_callback(uv_work_t* handle)
220{
221 auto* task = (TaskStreamInfo*)handle->data;
222 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
223 // We create a new signpost interval for this specific data processor. Same id, same data processor.
224 auto& dataProcessorContext = ref.get<DataProcessorContext>();
225 O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
226 O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
229 O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
230}
231
232// Once the processing in a thread is done, this is executed on the main thread.
233void run_completion(uv_work_t* handle, int status)
234{
235 auto* task = (TaskStreamInfo*)handle->data;
236 // Notice that the completion, while running on the main thread, still
237 // has a salt which is associated to the actual stream which was doing the computation
238 auto ref = ServiceRegistryRef{*task->registry, ServiceRegistry::globalStreamSalt(task->id.index + 1)};
239 auto& state = ref.get<DeviceState>();
240 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
241
242 using o2::monitoring::Metric;
243 using o2::monitoring::Monitoring;
244 using o2::monitoring::tags::Key;
245 using o2::monitoring::tags::Value;
246
247 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [ref](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
248 auto& dpStats = ref.get<DataProcessingStats>();
249 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
250 // For now we give back the offer if we did not use it completely.
251 // In principle we should try to run until the offer is fully consumed.
252 stats.totalConsumedTimeslices += std::min<int64_t>(accumulatedConsumed.timeslices, 1);
253
254 dpStats.updateStats({static_cast<short>(ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedBytes});
255 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED), DataProcessingStats::Op::Set, stats.totalConsumedTimeslices});
256 dpStats.processCommandQueue();
257 assert(stats.totalConsumedBytes == dpStats.metrics[(short)ProcessingStatsId::SHM_OFFER_BYTES_CONSUMED]);
258 assert(stats.totalConsumedTimeslices == dpStats.metrics[(short)ProcessingStatsId::TIMESLICE_OFFER_NUMBER_CONSUMED]);
259 };
260
261 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [ref](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
262 auto& dpStats = ref.get<DataProcessingStats>();
263 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
264 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
265 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
266 dpStats.processCommandQueue();
267 };
268
269 for (auto& consumer : state.offerConsumers) {
270 quotaEvaluator.consume(task->id.index, consumer, reportConsumedOffer);
271 }
272 state.offerConsumers.clear();
273 quotaEvaluator.handleExpired(reportExpiredOffer);
274 quotaEvaluator.dispose(task->id.index);
275 task->running = false;
276}
277
278// Context for polling
280 enum struct PollerState : char { Stopped,
282 Connected,
283 Suspended };
284 char const* name = nullptr;
285 uv_loop_t* loop = nullptr;
287 DeviceState* state = nullptr;
288 fair::mq::Socket* socket = nullptr;
290 int fd = -1;
291 bool read = true;
293};
294
295void on_socket_polled(uv_poll_t* poller, int status, int events)
296{
297 auto* context = (PollerContext*)poller->data;
298 assert(context);
299 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
300 context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
301 switch (events) {
302 case UV_READABLE: {
303 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
304 context->state->loopReason |= DeviceState::DATA_INCOMING;
305 } break;
306 case UV_WRITABLE: {
307 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
308 if (context->read) {
309 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
310 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
311 context->state->loopReason |= DeviceState::DATA_CONNECTED;
312 } else {
313 O2_SIGNPOST_START(sockets, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
314 context->state->loopReason |= DeviceState::DATA_OUTGOING;
315 // If the socket is writable, fairmq will handle the rest, so we can stop polling and
316 // just wait for the disconnect.
317 uv_poll_start(poller, UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
318 }
319 context->pollerState = PollerContext::PollerState::Connected;
320 } break;
321 case UV_DISCONNECT: {
322 O2_SIGNPOST_END(sockets, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
323 } break;
324 case UV_PRIORITIZED: {
325 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Socket prioritized for context %{public}s", context->name);
326 } break;
327 }
328 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
329}
330
331void on_out_of_band_polled(uv_poll_t* poller, int status, int events)
332{
333 O2_SIGNPOST_ID_FROM_POINTER(sid, sockets, poller);
334 auto* context = (PollerContext*)poller->data;
335 context->state->loopReason |= DeviceState::OOB_ACTIVITY;
336 if (status < 0) {
337 LOGP(fatal, "Error while polling {}: {}", context->name, status);
338 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
339 }
340 switch (events) {
341 case UV_READABLE: {
342 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
343 context->state->loopReason |= DeviceState::DATA_INCOMING;
344 assert(context->channelInfo);
345 context->channelInfo->readPolled = true;
346 } break;
347 case UV_WRITABLE: {
348 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket connected for channel %{public}s", context->name);
349 if (context->read) {
350 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for read in context %{public}s", context->name);
351 uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_out_of_band_polled);
352 } else {
353 O2_SIGNPOST_START(sockets, sid, "socket_state", "OOB socket connected for write for channel %{public}s", context->name);
354 context->state->loopReason |= DeviceState::DATA_OUTGOING;
355 }
356 } break;
357 case UV_DISCONNECT: {
358 O2_SIGNPOST_END(sockets, sid, "socket_state", "OOB socket disconnected in context %{public}s", context->name);
359 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
360 } break;
361 case UV_PRIORITIZED: {
362 O2_SIGNPOST_EVENT_EMIT(sockets, sid, "socket_state", "OOB socket prioritized for context %{public}s", context->name);
363 } break;
364 }
365 // We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
366}
367
376{
377 auto ref = ServiceRegistryRef{mServiceRegistry};
378 auto& context = ref.get<DataProcessorContext>();
379 auto& spec = getRunningDevice(mRunningDevice, ref);
380
381 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
382 O2_SIGNPOST_START(device, cid, "Init", "Entering Init callback.");
383 context.statelessProcess = spec.algorithm.onProcess;
384 context.statefulProcess = nullptr;
385 context.error = spec.algorithm.onError;
386 context.initError = spec.algorithm.onInitError;
387
388 auto configStore = DeviceConfigurationHelpers::getConfiguration(mServiceRegistry, spec.name.c_str(), spec.options);
389 if (configStore == nullptr) {
390 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
391 retrievers.emplace_back(std::make_unique<FairOptionsRetriever>(GetConfig()));
392 configStore = std::make_unique<ConfigParamStore>(spec.options, std::move(retrievers));
393 configStore->preload();
394 configStore->activate();
395 }
396
397 using boost::property_tree::ptree;
398
400 for (auto& entry : configStore->store()) {
401 std::stringstream ss;
402 std::string str;
403 if (entry.second.empty() == false) {
404 boost::property_tree::json_parser::write_json(ss, entry.second, false);
405 str = ss.str();
406 str.pop_back(); // remove EoL
407 } else {
408 str = entry.second.get_value<std::string>();
409 }
410 std::string configString = fmt::format("[CONFIG] {}={} 1 {}", entry.first, str, configStore->provenance(entry.first.c_str())).c_str();
411 mServiceRegistry.get<DriverClient>(ServiceRegistry::globalDeviceSalt()).tell(configString.c_str());
412 }
413
414 mConfigRegistry = std::make_unique<ConfigParamRegistry>(std::move(configStore));
415
416 // Setup the error handlers for init
417 if (context.initError) {
418 context.initErrorHandling = [&errorCallback = context.initError,
419 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
423 auto& context = ref.get<DataProcessorContext>();
424 auto& err = error_from_ref(e);
425 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
426 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Invoking errorCallback.", err.what);
427 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
428 auto& stats = ref.get<DataProcessingStats>();
430 InitErrorContext errorContext{ref, e};
431 errorCallback(errorContext);
432 };
433 } else {
434 context.initErrorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e) {
435 auto& err = error_from_ref(e);
439 auto& context = ref.get<DataProcessorContext>();
440 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
441 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Init", "Exception caught while in Init: %{public}s. Exiting with 1.", err.what);
442 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
443 auto& stats = ref.get<DataProcessingStats>();
445 exit(1);
446 };
447 }
448
449 context.expirationHandlers.clear();
450 context.init = spec.algorithm.onInit;
451 if (context.init) {
452 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
453 InitContext initContext{*mConfigRegistry, mServiceRegistry};
454
455 if (noCatch) {
456 try {
457 context.statefulProcess = context.init(initContext);
459 if (context.initErrorHandling) {
460 (context.initErrorHandling)(e);
461 }
462 }
463 } else {
464 try {
465 context.statefulProcess = context.init(initContext);
466 } catch (std::exception& ex) {
470 auto e = runtime_error(ex.what());
471 (context.initErrorHandling)(e);
473 (context.initErrorHandling)(e);
474 }
475 }
476 }
477 auto& state = ref.get<DeviceState>();
478 state.inputChannelInfos.resize(spec.inputChannels.size());
482 int validChannelId = 0;
483 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
484 auto& name = spec.inputChannels[ci].name;
485 if (name.find(spec.channelPrefix + "from_internal-dpl-clock") == 0) {
486 state.inputChannelInfos[ci].state = InputChannelState::Pull;
487 state.inputChannelInfos[ci].id = {ChannelIndex::INVALID};
488 validChannelId++;
489 } else {
490 state.inputChannelInfos[ci].id = {validChannelId++};
491 }
492 }
493
494 // Invoke the callback policy for this device.
495 if (spec.callbacksPolicy.policy != nullptr) {
496 InitContext initContext{*mConfigRegistry, mServiceRegistry};
497 spec.callbacksPolicy.policy(mServiceRegistry.get<CallbackService>(ServiceRegistry::globalDeviceSalt()), initContext);
498 }
499
500 // Services which are stream should be initialised now
501 auto* options = GetConfig();
502 for (size_t si = 0; si < mStreams.size(); ++si) {
504 mServiceRegistry.lateBindStreamServices(state, *options, streamSalt);
505 }
506 O2_SIGNPOST_END(device, cid, "Init", "Exiting Init callback.");
507}
508
509void on_signal_callback(uv_signal_t* handle, int signum)
510{
511 O2_SIGNPOST_ID_FROM_POINTER(sid, device, handle);
512 O2_SIGNPOST_START(device, sid, "signal_state", "Signal %d received.", signum);
513
514 auto* registry = (ServiceRegistry*)handle->data;
515 if (!registry) {
516 O2_SIGNPOST_END(device, sid, "signal_state", "No registry active. Ignoring signal.");
517 return;
518 }
519 ServiceRegistryRef ref{*registry};
520 auto& state = ref.get<DeviceState>();
521 auto& quotaEvaluator = ref.get<ComputingQuotaEvaluator>();
522 auto& stats = ref.get<DataProcessingStats>();
524 size_t ri = 0;
525 while (ri != quotaEvaluator.mOffers.size()) {
526 auto& offer = quotaEvaluator.mOffers[ri];
527 // We were already offered some sharedMemory, so we
528 // do not consider the offer.
529 // FIXME: in principle this should account for memory
530 // available and being offered, however we
531 // want to get out of the woods for now.
532 if (offer.valid && offer.sharedMemory != 0) {
533 O2_SIGNPOST_END(device, sid, "signal_state", "Memory already offered.");
534 return;
535 }
536 ri++;
537 }
538 // Find the first empty offer and have 1GB of shared memory there
539 for (auto& offer : quotaEvaluator.mOffers) {
540 if (offer.valid == false) {
541 offer.cpu = 0;
542 offer.memory = 0;
543 offer.sharedMemory = 1000000000;
544 offer.valid = true;
545 offer.user = -1;
546 break;
547 }
548 }
550 O2_SIGNPOST_END(device, sid, "signal_state", "Done processing signals.");
551}
552
553static auto toBeForwardedHeader = [](void* header) -> bool {
554 // If is now possible that the record is not complete when
555 // we forward it, because of a custom completion policy.
556 // this means that we need to skip the empty entries in the
557 // record for being forwarded.
558 if (header == nullptr) {
559 return false;
560 }
561 auto sih = o2::header::get<SourceInfoHeader*>(header);
562 if (sih) {
563 return false;
564 }
565
566 auto dih = o2::header::get<DomainInfoHeader*>(header);
567 if (dih) {
568 return false;
569 }
570
571 auto dh = o2::header::get<DataHeader*>(header);
572 if (!dh) {
573 return false;
574 }
575 auto dph = o2::header::get<DataProcessingHeader*>(header);
576 if (!dph) {
577 return false;
578 }
579 return true;
580};
581
582static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
583 FairMQDeviceProxy& proxy,
584 std::unique_ptr<fair::mq::Message>& header,
585 std::unique_ptr<fair::mq::Message>& payload,
586 size_t total,
587 bool consume) {
588 if (header.get() == nullptr) {
589 // Missing an header is not an error anymore.
590 // it simply means that we did not receive the
591 // given input, but we were asked to
592 // consume existing, so we skip it.
593 return false;
594 }
595 if (payload.get() == nullptr && consume == true) {
596 // If the payload is not there, it means we already
597 // processed it with ConsumeExisiting. Therefore we
598 // need to do something only if this is the last consume.
599 header.reset(nullptr);
600 return false;
601 }
602
603 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
604 if (fdph == nullptr) {
605 LOG(error) << "Data is missing DataProcessingHeader";
606 return false;
607 }
608 auto fdh = o2::header::get<DataHeader*>(header->GetData());
609 if (fdh == nullptr) {
610 LOG(error) << "Data is missing DataHeader";
611 return false;
612 }
613
614 // We need to find the forward route only for the first
615 // part of a split payload. All the others will use the same.
616 // but always check if we have a sequence of multiple payloads
617 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
618 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
619 }
620 return cachedForwardingChoices.empty() == false;
621};
622
623struct DecongestionContext {
626};
627
628auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
629 auto& oldestTimeslice = task.user<DecongestionContext>().oldestTimeslice;
630 auto& ref = task.user<DecongestionContext>().ref;
631
632 auto& decongestion = ref.get<DecongestionService>();
633 auto& proxy = ref.get<FairMQDeviceProxy>();
634 if (oldestTimeslice.timeslice.value <= decongestion.lastTimeslice) {
635 LOG(debug) << "Not sending already sent oldest possible timeslice " << oldestTimeslice.timeslice.value;
636 return;
637 }
638 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
639 auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
640 auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
641 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
642 // TODO: this we could cache in the proxy at the bind moment.
643 if (info.channelType != ChannelAccountingType::DPL) {
644 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Skipping channel %{public}s because it's not a DPL channel",
645 info.name.c_str());
646
647 continue;
648 }
649 if (DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestTimeslice.timeslice.value)) {
650 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputsCallback", "Forwarding to channel %{public}s oldest possible timeslice %zu, prio 20",
651 info.name.c_str(), oldestTimeslice.timeslice.value);
652 }
653 }
654};
655
656// This is how we do the forwarding, i.e. we push
657// the inputs which are shared between this device and others
658// to the next one in the daisy chain.
659// FIXME: do it in a smarter way than O(N^2)
660static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
661 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
662 auto& proxy = registry.get<FairMQDeviceProxy>();
663 // we collect all messages per forward in a map and send them together
664 std::vector<fair::mq::Parts> forwardedParts;
665 forwardedParts.resize(proxy.getNumForwards());
666 std::vector<ChannelIndex> cachedForwardingChoices{};
667 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
668 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
669 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
670
671 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
672 auto& messageSet = currentSetOfInputs[ii];
673 // In case the messageSet is empty, there is nothing to be done.
674 if (messageSet.size() == 0) {
675 continue;
676 }
677 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
678 continue;
679 }
680 cachedForwardingChoices.clear();
681
682 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
683 auto& messageSet = currentSetOfInputs[ii];
684 auto& header = messageSet.header(pi);
685 auto& payload = messageSet.payload(pi);
686 auto total = messageSet.getNumberOfPayloads(pi);
687
688 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
689 continue;
690 }
691
692 // In case of more than one forward route, we need to copy the message.
693 // This will eventually use the same mamory if running with the same backend.
694 if (cachedForwardingChoices.size() > 1) {
695 copy = true;
696 }
697 auto* dh = o2::header::get<DataHeader*>(header->GetData());
698 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
699
700 if (copy) {
701 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
702 auto&& newHeader = header->GetTransport()->CreateMessage();
703 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
704 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
705 newHeader->Copy(*header);
706 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
707
708 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
709 auto&& newPayload = header->GetTransport()->CreateMessage();
710 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
711 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
712 }
713 }
714 } else {
715 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
716 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
717 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
718 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
719 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
720 }
721 }
722 }
723 }
724 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %zu messages", forwardedParts.size());
725 for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
726 if (forwardedParts[fi].Size() == 0) {
727 continue;
728 }
729 ForwardChannelInfo info = proxy.getForwardChannelInfo(ChannelIndex{fi});
730 auto& parts = forwardedParts[fi];
731 if (info.policy == nullptr) {
732 O2_SIGNPOST_EVENT_EMIT_ERROR(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d has no policy.", info.name.c_str(), fi);
733 continue;
734 }
735 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
736 info.policy->forward(parts, ChannelIndex{fi}, registry);
737 }
738
739 auto& asyncQueue = registry.get<AsyncQueue>();
740 auto& decongestion = registry.get<DecongestionService>();
741 O2_SIGNPOST_ID_GENERATE(aid, async_queue);
742 O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
743 AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
744 .user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));
745 O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
746};
747
748extern volatile int region_read_global_dummy_variable;
750
752void handleRegionCallbacks(ServiceRegistryRef registry, std::vector<fair::mq::RegionInfo>& infos)
753{
754 if (infos.empty() == false) {
755 std::vector<fair::mq::RegionInfo> toBeNotified;
756 toBeNotified.swap(infos); // avoid any MT issue.
757 static bool dummyRead = getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS") && atoi(getenv("DPL_DEBUG_MAP_ALL_SHM_REGIONS"));
758 for (auto const& info : toBeNotified) {
759 if (dummyRead) {
760 for (size_t i = 0; i < info.size / sizeof(region_read_global_dummy_variable); i += 4096 / sizeof(region_read_global_dummy_variable)) {
761 region_read_global_dummy_variable = ((int*)info.ptr)[i];
762 }
763 }
764 registry.get<CallbackService>().call<CallbackService::Id::RegionInfoCallback>(info);
765 }
766 }
767}
768
769namespace
770{
772{
773 auto* state = (DeviceState*)handle->data;
775}
776} // namespace
777
778void DataProcessingDevice::initPollers()
779{
780 auto ref = ServiceRegistryRef{mServiceRegistry};
781 auto& deviceContext = ref.get<DeviceContext>();
782 auto& context = ref.get<DataProcessorContext>();
783 auto& spec = ref.get<DeviceSpec const>();
784 auto& state = ref.get<DeviceState>();
785 // We add a timer only in case a channel poller is not there.
786 if ((context.statefulProcess != nullptr) || (context.statelessProcess != nullptr)) {
787 for (auto& [channelName, channel] : GetChannels()) {
788 InputChannelInfo* channelInfo;
789 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
790 auto& channelSpec = spec.inputChannels[ci];
791 channelInfo = &state.inputChannelInfos[ci];
792 if (channelSpec.name != channelName) {
793 continue;
794 }
795 channelInfo->channel = &this->GetChannel(channelName, 0);
796 break;
797 }
798 if ((channelName.rfind("from_internal-dpl", 0) == 0) &&
799 (channelName.rfind("from_internal-dpl-aod", 0) != 0) &&
800 (channelName.rfind("from_internal-dpl-ccdb-backend", 0) != 0) &&
801 (channelName.rfind("from_internal-dpl-injected", 0)) != 0) {
802 LOGP(detail, "{} is an internal channel. Skipping as no input will come from there.", channelName);
803 continue;
804 }
805 // We only watch receiving sockets.
806 if (channelName.rfind("from_" + spec.name + "_", 0) == 0) {
807 LOGP(detail, "{} is to send data. Not polling.", channelName);
808 continue;
809 }
810
811 if (channelName.rfind("from_", 0) != 0) {
812 LOGP(detail, "{} is not a DPL socket. Not polling.", channelName);
813 continue;
814 }
815
816 // We assume there is always a ZeroMQ socket behind.
817 int zmq_fd = 0;
818 size_t zmq_fd_len = sizeof(zmq_fd);
819 // FIXME: I should probably save those somewhere... ;-)
820 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
821 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
822 if (zmq_fd == 0) {
823 LOG(error) << "Cannot get file descriptor for channel." << channelName;
824 continue;
825 }
826 LOGP(detail, "Polling socket for {}", channelName);
827 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
828 pCtx->name = strdup(channelName.c_str());
829 pCtx->loop = state.loop;
830 pCtx->device = this;
831 pCtx->state = &state;
832 pCtx->fd = zmq_fd;
833 assert(channelInfo != nullptr);
834 pCtx->channelInfo = channelInfo;
835 pCtx->socket = &channel[0].GetSocket();
836 pCtx->read = true;
837 poller->data = pCtx;
838 uv_poll_init(state.loop, poller, zmq_fd);
839 if (channelName.rfind("from_", 0) != 0) {
840 LOGP(detail, "{} is an out of band channel.", channelName);
841 state.activeOutOfBandPollers.push_back(poller);
842 } else {
843 channelInfo->pollerIndex = state.activeInputPollers.size();
844 state.activeInputPollers.push_back(poller);
845 }
846 }
847 // In case we do not have any input channel and we do not have
848 // any timers or signal watchers we still wake up whenever we can send data to downstream
849 // devices to allow for enumerations.
850 if (state.activeInputPollers.empty() &&
851 state.activeOutOfBandPollers.empty() &&
852 state.activeTimers.empty() &&
853 state.activeSignals.empty()) {
854 // FIXME: this is to make sure we do not reset the output timer
855 // for readout proxies or similar. In principle this should go once
856 // we move to OutOfBand InputSpec.
857 if (state.inputChannelInfos.empty()) {
858 LOGP(detail, "No input channels. Setting exit transition timeout to 0.");
859 deviceContext.exitTransitionTimeout = 0;
860 }
861 for (auto& [channelName, channel] : GetChannels()) {
862 if (channelName.rfind(spec.channelPrefix + "from_internal-dpl", 0) == 0) {
863 LOGP(detail, "{} is an internal channel. Not polling.", channelName);
864 continue;
865 }
866 if (channelName.rfind(spec.channelPrefix + "from_" + spec.name + "_", 0) == 0) {
867 LOGP(detail, "{} is an out of band channel. Not polling for output.", channelName);
868 continue;
869 }
870 // We assume there is always a ZeroMQ socket behind.
871 int zmq_fd = 0;
872 size_t zmq_fd_len = sizeof(zmq_fd);
873 // FIXME: I should probably save those somewhere... ;-)
874 auto* poller = (uv_poll_t*)malloc(sizeof(uv_poll_t));
875 channel[0].GetSocket().GetOption("fd", &zmq_fd, &zmq_fd_len);
876 if (zmq_fd == 0) {
877 LOGP(error, "Cannot get file descriptor for channel {}", channelName);
878 continue;
879 }
880 LOG(detail) << "Polling socket for " << channel[0].GetName();
881 // FIXME: leak
882 auto* pCtx = (PollerContext*)malloc(sizeof(PollerContext));
883 pCtx->name = strdup(channelName.c_str());
884 pCtx->loop = state.loop;
885 pCtx->device = this;
886 pCtx->state = &state;
887 pCtx->fd = zmq_fd;
888 pCtx->read = false;
889 poller->data = pCtx;
890 uv_poll_init(state.loop, poller, zmq_fd);
891 state.activeOutputPollers.push_back(poller);
892 }
893 }
894 } else {
895 LOGP(detail, "This is a fake device so we exit after the first iteration.");
896 deviceContext.exitTransitionTimeout = 0;
897 // This is a fake device, so we can request to exit immediately
898 ServiceRegistryRef ref{mServiceRegistry};
899 ref.get<ControlService>().readyToQuit(QuitRequest::Me);
900 // A two second timer to stop internal devices which do not want to
901 auto* timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
902 uv_timer_init(state.loop, timer);
903 timer->data = &state;
904 uv_update_time(state.loop);
905 uv_timer_start(timer, on_idle_timer, 2000, 2000);
906 state.activeTimers.push_back(timer);
907 }
908}
909
910void DataProcessingDevice::startPollers()
911{
912 auto ref = ServiceRegistryRef{mServiceRegistry};
913 auto& deviceContext = ref.get<DeviceContext>();
914 auto& state = ref.get<DeviceState>();
915
916 for (auto* poller : state.activeInputPollers) {
917 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
918 O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
919 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
920 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
921 }
922 for (auto& poller : state.activeOutOfBandPollers) {
923 uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
924 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
925 }
926 for (auto* poller : state.activeOutputPollers) {
927 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
928 O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
929 uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
930 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
931 }
932
933 deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
934 deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
935 uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
936
937 deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
938 deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
939 uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
940}
941
942void DataProcessingDevice::stopPollers()
943{
944 auto ref = ServiceRegistryRef{mServiceRegistry};
945 auto& deviceContext = ref.get<DeviceContext>();
946 auto& state = ref.get<DeviceState>();
947 LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
948 for (auto* poller : state.activeInputPollers) {
949 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
950 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
951 uv_poll_stop(poller);
952 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
953 }
954 LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
955 for (auto* poller : state.activeOutOfBandPollers) {
956 uv_poll_stop(poller);
957 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
958 }
959 LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
960 for (auto* poller : state.activeOutputPollers) {
961 O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
962 O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
963 uv_poll_stop(poller);
964 ((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
965 }
966
967 uv_timer_stop(deviceContext.gracePeriodTimer);
968 delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
969 free(deviceContext.gracePeriodTimer);
970 deviceContext.gracePeriodTimer = nullptr;
971
972 uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
973 delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
974 free(deviceContext.dataProcessingGracePeriodTimer);
975 deviceContext.dataProcessingGracePeriodTimer = nullptr;
976}
977
979{
980 auto ref = ServiceRegistryRef{mServiceRegistry};
981 auto& deviceContext = ref.get<DeviceContext>();
982 auto& context = ref.get<DataProcessorContext>();
983
984 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
985 O2_SIGNPOST_START(device, cid, "InitTask", "Entering InitTask callback.");
986 auto& spec = getRunningDevice(mRunningDevice, mServiceRegistry);
987 auto distinct = DataRelayerHelpers::createDistinctRouteIndex(spec.inputs);
988 auto& state = ref.get<DeviceState>();
989 int i = 0;
990 for (auto& di : distinct) {
991 auto& route = spec.inputs[di];
992 if (route.configurator.has_value() == false) {
993 i++;
994 continue;
995 }
996 ExpirationHandler handler{
997 .name = route.configurator->name,
998 .routeIndex = RouteIndex{i++},
999 .lifetime = route.matcher.lifetime,
1000 .creator = route.configurator->creatorConfigurator(state, mServiceRegistry, *mConfigRegistry),
1001 .checker = route.configurator->danglingConfigurator(state, *mConfigRegistry),
1002 .handler = route.configurator->expirationConfigurator(state, *mConfigRegistry)};
1003 context.expirationHandlers.emplace_back(std::move(handler));
1004 }
1005
1006 if (state.awakeMainThread == nullptr) {
1007 state.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
1008 state.awakeMainThread->data = &state;
1009 uv_async_init(state.loop, state.awakeMainThread, on_awake_main_thread);
1010 }
1011
1012 deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
1013 deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
1014 deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
1015
1016 for (auto& channel : GetChannels()) {
1017 channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
1018 &registry = mServiceRegistry,
1019 &pendingRegionInfos = mPendingRegionInfos,
1020 &regionInfoMutex = mRegionInfoMutex](fair::mq::RegionInfo info) {
1021 std::lock_guard<std::mutex> lock(regionInfoMutex);
1022 LOG(detail) << ">>> Region info event" << info.event;
1023 LOG(detail) << "id: " << info.id;
1024 LOG(detail) << "ptr: " << info.ptr;
1025 LOG(detail) << "size: " << info.size;
1026 LOG(detail) << "flags: " << info.flags;
1027 // Now we check for pending events with the mutex,
1028 // so the lines below are atomic.
1029 pendingRegionInfos.push_back(info);
1030 context.expectedRegionCallbacks -= 1;
1031 // We always want to handle these on the main loop,
1032 // so we awake it.
1033 ServiceRegistryRef ref{registry};
1034 uv_async_send(ref.get<DeviceState>().awakeMainThread);
1035 });
1036 }
1037
1038 // Add a signal manager for SIGUSR1 so that we can force
1039 // an event from the outside, making sure that the event loop can
1040 // be unblocked (e.g. by a quitting DPL driver) even when there
1041 // is no data pending to be processed.
1042 if (deviceContext.sigusr1Handle == nullptr) {
1043 deviceContext.sigusr1Handle = (uv_signal_t*)malloc(sizeof(uv_signal_t));
1044 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1045 uv_signal_init(state.loop, deviceContext.sigusr1Handle);
1046 uv_signal_start(deviceContext.sigusr1Handle, on_signal_callback, SIGUSR1);
1047 }
1048 // If there is any signal, we want to make sure they are active
1049 for (auto& handle : state.activeSignals) {
1050 handle->data = &state;
1051 }
1052 // When we start, we must make sure that we do listen to the signal
1053 deviceContext.sigusr1Handle->data = &mServiceRegistry;
1054
1056 DataProcessingDevice::initPollers();
1057
1058 // Whenever we InitTask, we consider as if the previous iteration
1059 // was successful, so that even if there is no timer or receiving
1060 // channel, we can still start an enumeration.
1061 DataProcessorContext* initialContext = nullptr;
1062 bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
1063 if (!idle) {
1064 LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
1065 }
1066
1067 // We should be ready to run here. Therefore we copy all the
1068 // required parts in the DataProcessorContext. Eventually we should
1069 // do so on a per thread basis, with fine grained locks.
1070 // FIXME: this should not use ServiceRegistry::threadSalt, but
1071 // more a ServiceRegistry::globalDataProcessorSalt(N) where
1072 // N is the number of the multiplexed data processor.
1073 // We will get there.
1074 this->fillContext(mServiceRegistry.get<DataProcessorContext>(ServiceRegistry::globalDeviceSalt()), deviceContext);
1075
1076 O2_SIGNPOST_END(device, cid, "InitTask", "Exiting InitTask callback waiting for the remaining region callbacks.");
1077
1078 auto hasPendingEvents = [&mutex = mRegionInfoMutex, &pendingRegionInfos = mPendingRegionInfos](DeviceContext& deviceContext) {
1079 std::lock_guard<std::mutex> lock(mutex);
1080 return (pendingRegionInfos.empty() == false) || deviceContext.expectedRegionCallbacks > 0;
1081 };
1082 O2_SIGNPOST_START(device, cid, "InitTask", "Waiting for registation events.");
1087 while (hasPendingEvents(deviceContext)) {
1088 // Wait for the callback to signal its done, so that we do not busy wait.
1089 uv_run(state.loop, UV_RUN_ONCE);
1090 // Handle callbacks if any
1091 {
1092 O2_SIGNPOST_EVENT_EMIT(device, cid, "InitTask", "Memory registration event received.");
1093 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1094 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1095 }
1096 }
1097 O2_SIGNPOST_END(device, cid, "InitTask", "Done waiting for registration events.");
1098}
1099
1101{
1102 context.isSink = false;
1103 // If nothing is a sink, the rate limiting simply does not trigger.
1104 bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
1105
1106 auto ref = ServiceRegistryRef{mServiceRegistry};
1107 auto& spec = ref.get<DeviceSpec const>();
1108
1109 // The policy is now allowed to state the default.
1110 context.balancingInputs = spec.completionPolicy.balanceChannels;
1111 // This is needed because the internal injected dummy sink should not
1112 // try to balance inputs unless the rate limiting is requested.
1113 if (enableRateLimiting == false && spec.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
1114 context.balancingInputs = false;
1115 }
1116 if (enableRateLimiting) {
1117 for (auto& spec : spec.outputs) {
1118 if (spec.matcher.binding.value == "dpl-summary") {
1119 context.isSink = true;
1120 break;
1121 }
1122 }
1123 }
1124
1125 context.registry = &mServiceRegistry;
1128 if (context.error != nullptr) {
1129 context.errorHandling = [&errorCallback = context.error,
1130 &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1134 auto& err = error_from_ref(e);
1135 auto& context = ref.get<DataProcessorContext>();
1136 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1137 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Invoking callback.", err.what);
1138 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1139 auto& stats = ref.get<DataProcessingStats>();
1141 ErrorContext errorContext{record, ref, e};
1142 errorCallback(errorContext);
1143 };
1144 } else {
1145 context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
1146 auto& err = error_from_ref(e);
1150 auto& context = ref.get<DataProcessorContext>();
1151 auto& deviceContext = ref.get<DeviceContext>();
1152 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
1153 BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
1154 auto& stats = ref.get<DataProcessingStats>();
1156 switch (deviceContext.processingPolicies.error) {
1158 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
1159 throw e;
1160 default:
1161 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Skipping to next timeframe.", err.what);
1162 break;
1163 }
1164 };
1165 }
1166
1167 auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
1170 bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
1171 bool onlyConditions = true;
1172 bool overriddenEarlyForward = false;
1173 for (auto& forwarded : spec.forwards) {
1174 if (forwarded.matcher.lifetime != Lifetime::Condition) {
1175 onlyConditions = false;
1176 }
1177#if !__has_include(<fairmq/shmem/Message.h>)
1178 if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
1179 context.canForwardEarly = false;
1180 overriddenEarlyForward = true;
1181 LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
1182 break;
1183 }
1184#endif
1186 context.canForwardEarly = false;
1187 overriddenEarlyForward = true;
1188 LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
1189 break;
1190 }
1191 if (forwarded.matcher.lifetime == Lifetime::Optional) {
1192 context.canForwardEarly = false;
1193 overriddenEarlyForward = true;
1194 LOG(detail) << "Cannot forward early because of Optional input: " << DataSpecUtils::describe(forwarded.matcher);
1195 break;
1196 }
1197 }
1198 if (!overriddenEarlyForward && onlyConditions) {
1199 context.canForwardEarly = true;
1200 LOG(detail) << "Enabling early forwarding because only conditions to be forwarded";
1201 }
1202 return canForwardEarly;
1203 };
1204 context.canForwardEarly = decideEarlyForward();
1205}
1206
1208{
1209 auto ref = ServiceRegistryRef{mServiceRegistry};
1210 auto& state = ref.get<DeviceState>();
1211
1212 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.loop);
1213 O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
1214 state.quitRequested = false;
1216 state.allowedProcessing = DeviceState::Any;
1217 for (auto& info : state.inputChannelInfos) {
1218 if (info.state != InputChannelState::Pull) {
1219 info.state = InputChannelState::Running;
1220 }
1221 }
1222
1223 // Catch callbacks which fail before we start.
1224 // Notice that when running multiple dataprocessors
1225 // we should probably allow expendable ones to fail.
1226 try {
1227 auto& dpContext = ref.get<DataProcessorContext>();
1228 dpContext.preStartCallbacks(ref);
1229 for (size_t i = 0; i < mStreams.size(); ++i) {
1230 auto streamRef = ServiceRegistryRef{mServiceRegistry, ServiceRegistry::globalStreamSalt(i + 1)};
1231 auto& context = streamRef.get<StreamContext>();
1232 context.preStartStreamCallbacks(streamRef);
1233 }
1234 } catch (std::exception& e) {
1235 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type std::exception caught in PreRun: %{public}s. Rethrowing.", e.what());
1236 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1237 throw;
1238 } catch (o2::framework::RuntimeErrorRef& e) {
1239 auto& err = error_from_ref(e);
1240 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "PreRun", "Exception of type o2::framework::RuntimeErrorRef caught in PreRun: %{public}s. Rethrowing.", err.what);
1241 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun due to exception thrown.");
1242 throw;
1243 } catch (...) {
1244 O2_SIGNPOST_END(device, cid, "PreRun", "Unknown exception being thrown. Rethrowing.");
1245 throw;
1246 }
1247
1248 ref.get<CallbackService>().call<CallbackService::Id::Start>();
1249 startPollers();
1250
1251 // Raise to 1 when we are ready to start processing
1252 using o2::monitoring::Metric;
1253 using o2::monitoring::Monitoring;
1254 using o2::monitoring::tags::Key;
1255 using o2::monitoring::tags::Value;
1256
1257 auto& monitoring = ref.get<Monitoring>();
1258 monitoring.send(Metric{(uint64_t)1, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1259 O2_SIGNPOST_END(device, cid, "PreRun", "Exiting PreRun callback.");
1260}
1261
1263{
1264 ServiceRegistryRef ref{mServiceRegistry};
1265 // Raise to 1 when we are ready to start processing
1266 using o2::monitoring::Metric;
1267 using o2::monitoring::Monitoring;
1268 using o2::monitoring::tags::Key;
1269 using o2::monitoring::tags::Value;
1270
1271 auto& monitoring = ref.get<Monitoring>();
1272 monitoring.send(Metric{(uint64_t)0, "device_state"}.addTag(Key::Subsystem, Value::DPL));
1273
1274 stopPollers();
1275 ref.get<CallbackService>().call<CallbackService::Id::Stop>();
1276 auto& dpContext = ref.get<DataProcessorContext>();
1277 dpContext.postStopCallbacks(ref);
1278}
1279
1281{
1282 ServiceRegistryRef ref{mServiceRegistry};
1283 ref.get<CallbackService>().call<CallbackService::Id::Reset>();
1284}
1285
1287{
1288 ServiceRegistryRef ref{mServiceRegistry};
1289 auto& state = ref.get<DeviceState>();
1291 bool firstLoop = true;
1292 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
1293 O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop");
1294
1295 bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr;
1296 if (dplEnableMultithreding) {
1297 setenv("UV_THREADPOOL_SIZE", "1", 1);
1298 }
1299
1300 while (state.transitionHandling != TransitionHandlingState::Expired) {
1301 if (state.nextFairMQState.empty() == false) {
1302 (void)this->ChangeState(state.nextFairMQState.back());
1303 state.nextFairMQState.pop_back();
1304 }
1305 // Notify on the main thread the new region callbacks, making sure
1306 // no callback is issued if there is something still processing.
1307 {
1308 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1309 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1310 }
1311 // This will block for the correct delay (or until we get data
1312 // on a socket). We also do not block on the first iteration
1313 // so that devices which do not have a timer can still start an
1314 // enumeration.
1315 {
1316 ServiceRegistryRef ref{mServiceRegistry};
1317 ref.get<DriverClient>().flushPending(mServiceRegistry);
1318 DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
1319 // Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
1320 // In such case we will take care of it at next iteration.
1321 state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);
1322
1323 auto shouldNotWait = (lastActive != nullptr &&
1324 (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
1326 if (firstLoop) {
1327 shouldNotWait = true;
1328 firstLoop = false;
1329 }
1330 if (lastActive != nullptr) {
1332 }
1333 if (NewStatePending()) {
1334 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "New state pending. Waiting for it to be handled.");
1335 shouldNotWait = true;
1337 }
1339 // If we are Idle, we can then consider the transition to be expired.
1340 if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
1341 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
1342 state.transitionHandling = TransitionHandlingState::Expired;
1343 }
1344 if (state.severityStack.empty() == false) {
1345 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1346 state.severityStack.pop_back();
1347 }
1348 // for (auto &info : mDeviceContext.state->inputChannelInfos) {
1349 // shouldNotWait |= info.readPolled;
1350 // }
1351 state.loopReason = DeviceState::NO_REASON;
1352 state.firedTimers.clear();
1353 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_CALLBACKS) != 0) {
1354 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1355 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1356 }
1357 // Run the asynchronous queue just before sleeping again, so that:
1358 // - we can trigger further events from the queue
1359 // - we can guarantee this is the last thing we do in the loop (
1360 // assuming no one else is adding to the queue before this point).
1361 auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1362 O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
1363 ServiceRegistryRef ref{registry};
1364 ref.get<AsyncQueue>();
1365 ref.get<DecongestionService>();
1366 ref.get<DataRelayer>();
1367 // Get the current timeslice for the slot.
1368 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1370 forwardInputs(registry, slot, dropped, oldestOutputInfo, false, true);
1371 };
1372 auto& relayer = ref.get<DataRelayer>();
1373 relayer.prunePending(onDrop);
1374 auto& queue = ref.get<AsyncQueue>();
1375 auto oldestPossibleTimeslice = relayer.getOldestPossibleOutput();
1376 AsyncQueueHelpers::run(queue, {oldestPossibleTimeslice.timeslice.value});
1377 if (shouldNotWait == false) {
1378 auto& dpContext = ref.get<DataProcessorContext>();
1379 dpContext.preLoopCallbacks(ref);
1380 }
1381 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. %{}s", shouldNotWait ? "Will immediately schedule a new one" : "Waiting for next event.");
1382 uv_run(state.loop, shouldNotWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
1383 O2_SIGNPOST_START(device, lid, "run_loop", "Run loop started. Loop reason %d.", state.loopReason);
1384 if ((state.loopReason & state.tracingFlags) != 0) {
1385 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
1386 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
1387 } else if (state.severityStack.empty() == false) {
1388 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
1389 state.severityStack.pop_back();
1390 }
1391 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Loop reason mask %x & %x = %x", state.loopReason, state.tracingFlags, state.loopReason & state.tracingFlags);
1392
1393 if ((state.loopReason & DeviceState::LoopReason::OOB_ACTIVITY) != 0) {
1394 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Out of band activity detected. Rescanning everything.");
1395 relayer.rescan();
1396 }
1397
1398 if (!state.pendingOffers.empty()) {
1399 O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "Pending %" PRIu64 " offers. updating the ComputingQuotaEvaluator.", (uint64_t)state.pendingOffers.size());
1400 ref.get<ComputingQuotaEvaluator>().updateOffers(state.pendingOffers, uv_now(state.loop));
1401 }
1402 }
1403
1404 // Notify on the main thread the new region callbacks, making sure
1405 // no callback is issued if there is something still processing.
1406 // Notice that we still need to perform callbacks also after
1407 // the socket epolled, because otherwise we would end up serving
1408 // the callback after the first data arrives is the system is too
1409 // fast to transition from Init to Run.
1410 {
1411 std::lock_guard<std::mutex> lock(mRegionInfoMutex);
1412 handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
1413 }
1414
1415 assert(mStreams.size() == mHandles.size());
1417 TaskStreamRef streamRef{-1};
1418 for (size_t ti = 0; ti < mStreams.size(); ti++) {
1419 auto& taskInfo = mStreams[ti];
1420 if (taskInfo.running) {
1421 continue;
1422 }
1423 // Stream 0 is for when we run in
1424 streamRef.index = ti;
1425 }
1426 using o2::monitoring::Metric;
1427 using o2::monitoring::Monitoring;
1428 using o2::monitoring::tags::Key;
1429 using o2::monitoring::tags::Value;
1430 // We have an empty stream, let's check if we have enough
1431 // resources for it to run something
1432 if (streamRef.index != -1) {
1433 // Synchronous execution of the callbacks. This will be moved in the
1434 // moved in the on_socket_polled once we have threading in place.
1435 uv_work_t& handle = mHandles[streamRef.index];
1436 TaskStreamInfo& stream = mStreams[streamRef.index];
1437 handle.data = &mStreams[streamRef.index];
1438
1439 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> reportExpiredOffer = [&registry = mServiceRegistry](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
1440 ServiceRegistryRef ref{registry};
1441 auto& dpStats = ref.get<DataProcessingStats>();
1442 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCE_OFFER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredOffers});
1443 dpStats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredBytes});
1444 dpStats.updateStats({static_cast<short>(ProcessingStatsId::TIMESLICE_NUMBER_EXPIRED), DataProcessingStats::Op::Set, stats.totalExpiredTimeslices});
1445 dpStats.processCommandQueue();
1446 };
1447 auto ref = ServiceRegistryRef{mServiceRegistry};
1448
1449 // Deciding wether to run or not can be done by passing a request to
1450 // the evaluator. In this case, the request is always satisfied and
1451 // we run on whatever resource is available.
1452 auto& spec = ref.get<DeviceSpec const>();
1453 bool enough = ref.get<ComputingQuotaEvaluator>().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop));
1454
1455 struct SchedulingStats {
1456 std::atomic<size_t> lastScheduled = 0;
1457 std::atomic<size_t> numberOfUnscheduledSinceLastScheduled = 0;
1458 std::atomic<size_t> numberOfUnscheduled = 0;
1459 std::atomic<size_t> numberOfScheduled = 0;
1460 };
1461 static SchedulingStats schedulingStats;
1462 O2_SIGNPOST_ID_GENERATE(sid, scheduling);
1463 if (enough) {
1464 stream.id = streamRef;
1465 stream.running = true;
1466 stream.registry = &mServiceRegistry;
1467 schedulingStats.lastScheduled = uv_now(state.loop);
1468 schedulingStats.numberOfScheduled++;
1469 schedulingStats.numberOfUnscheduledSinceLastScheduled = 0;
1470 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", "Enough resources to schedule computation on stream %d", streamRef.index);
1471 if (dplEnableMultithreding) [[unlikely]] {
1472 stream.task = &handle;
1473 uv_queue_work(state.loop, stream.task, run_callback, run_completion);
1474 } else {
1475 run_callback(&handle);
1476 run_completion(&handle, 0);
1477 }
1478 } else {
1479 if (schedulingStats.numberOfUnscheduledSinceLastScheduled > 100 ||
1480 (uv_now(state.loop) - schedulingStats.lastScheduled) > 30000) {
1481 O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run",
1482 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1483 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1484 schedulingStats.lastScheduled.load());
1485 } else {
1486 O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run",
1487 "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu.",
1488 schedulingStats.numberOfUnscheduledSinceLastScheduled.load(),
1489 schedulingStats.lastScheduled.load());
1490 }
1491 schedulingStats.numberOfUnscheduled++;
1492 schedulingStats.numberOfUnscheduledSinceLastScheduled++;
1493 auto ref = ServiceRegistryRef{mServiceRegistry};
1494 ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
1495 }
1496 }
1497 }
1498
1499 O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
1500 auto& spec = ref.get<DeviceSpec const>();
1502 for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
1503 auto& info = state.inputChannelInfos[ci];
1504 info.parts.fParts.clear();
1505 }
1506 state.transitionHandling = TransitionHandlingState::NoTransition;
1507}
1508
1512{
1513 auto& context = ref.get<DataProcessorContext>();
1514 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1515 O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");
1516
1517 {
1518 ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
1519 }
1520 // Whether or not we had something to do.
1521
1522 // Initialise the value for context.allDone. It will possibly be updated
1523 // below if any of the channels is not done.
1524 //
1525 // Notice that fake input channels (InputChannelState::Pull) cannot possibly
1526 // expect to receive an EndOfStream signal. Thus we do not wait for these
1527 // to be completed. In the case of data source devices, as they do not have
1528 // real data input channels, they have to signal EndOfStream themselves.
1529 auto& state = ref.get<DeviceState>();
1530 auto& spec = ref.get<DeviceSpec const>();
1531 O2_SIGNPOST_ID_FROM_POINTER(cid, device, state.inputChannelInfos.data());
1532 O2_SIGNPOST_START(device, cid, "do_prepare", "Reported channel states.");
1533 context.allDone = std::any_of(state.inputChannelInfos.begin(), state.inputChannelInfos.end(), [cid](const auto& info) {
1534 if (info.channel) {
1535 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "Input channel %{public}s%{public}s has %zu parts left and is in state %d.",
1536 info.channel->GetName().c_str(), (info.id.value == ChannelIndex::INVALID ? " (non DPL)" : ""), info.parts.fParts.size(), (int)info.state);
1537 } else {
1538 O2_SIGNPOST_EVENT_EMIT(device, cid, "do_prepare", "External channel %d is in state %d.", info.id.value, (int)info.state);
1539 }
1540 return (info.parts.fParts.empty() == true && info.state != InputChannelState::Pull);
1541 });
1542 O2_SIGNPOST_END(device, cid, "do_prepare", "End report.");
1543 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Processing %zu input channels.", spec.inputChannels.size());
1546 static std::vector<int> pollOrder;
1547 pollOrder.resize(state.inputChannelInfos.size());
1548 std::iota(pollOrder.begin(), pollOrder.end(), 0);
1549 std::sort(pollOrder.begin(), pollOrder.end(), [&infos = state.inputChannelInfos](int a, int b) {
1550 return infos[a].oldestForChannel.value < infos[b].oldestForChannel.value;
1551 });
1552
1553 // Nothing to poll...
1554 if (pollOrder.empty()) {
1555 O2_SIGNPOST_END(device, dpid, "do_prepare", "Nothing to poll. Waiting for next iteration.");
1556 return;
1557 }
1558 auto currentOldest = state.inputChannelInfos[pollOrder.front()].oldestForChannel;
1559 auto currentNewest = state.inputChannelInfos[pollOrder.back()].oldestForChannel;
1560 auto delta = currentNewest.value - currentOldest.value;
1561 O2_SIGNPOST_EVENT_EMIT(device, dpid, "do_prepare", "Oldest possible timeframe range %" PRIu64 " => %" PRIu64 " delta %" PRIu64,
1562 (int64_t)currentOldest.value, (int64_t)currentNewest.value, (int64_t)delta);
1563 auto& infos = state.inputChannelInfos;
1564
1565 if (context.balancingInputs) {
1566 static int pipelineLength = DefaultsHelpers::pipelineLength();
1567 static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
1568 auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
1569 return infos[a].oldestForChannel.value > limitNew;
1570 });
1571 for (auto it = pollOrder.begin(); it < pollOrder.end(); it++) {
1572 const auto& channelInfo = state.inputChannelInfos[*it];
1573 if (channelInfo.pollerIndex != -1) {
1574 auto& poller = state.activeInputPollers[channelInfo.pollerIndex];
1575 auto& pollerContext = *(PollerContext*)(poller->data);
1576 if (pollerContext.pollerState == PollerContext::PollerState::Connected || pollerContext.pollerState == PollerContext::PollerState::Suspended) {
1577 bool running = pollerContext.pollerState == PollerContext::PollerState::Connected;
1578 bool shouldBeRunning = it < newEnd;
1579 if (running != shouldBeRunning) {
1580 uv_poll_start(poller, shouldBeRunning ? UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED : 0, &on_socket_polled);
1581 pollerContext.pollerState = shouldBeRunning ? PollerContext::PollerState::Connected : PollerContext::PollerState::Suspended;
1582 }
1583 }
1584 }
1585 }
1586 pollOrder.erase(newEnd, pollOrder.end());
1587 }
1588 O2_SIGNPOST_END(device, dpid, "do_prepare", "%zu channels pass the channel inbalance balance check.", pollOrder.size());
1589
1590 for (auto sci : pollOrder) {
1591 auto& info = state.inputChannelInfos[sci];
1592 auto& channelSpec = spec.inputChannels[sci];
1593 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1594 O2_SIGNPOST_START(device, cid, "channels", "Processing channel %s", channelSpec.name.c_str());
1595
1596 if (info.state != InputChannelState::Completed && info.state != InputChannelState::Pull) {
1597 context.allDone = false;
1598 }
1599 if (info.state != InputChannelState::Running) {
1600 // Remember to flush data if we are not running
1601 // and there is some message pending.
1602 if (info.parts.Size()) {
1604 }
1605 O2_SIGNPOST_END(device, cid, "channels", "Flushing channel %s which is in state %d and has %zu parts still pending.",
1606 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1607 continue;
1608 }
1609 if (info.channel == nullptr) {
1610 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is nullptr and has %zu parts still pending.",
1611 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1612 continue;
1613 }
1614 // Only poll DPL channels for now.
1616 O2_SIGNPOST_END(device, cid, "channels", "Channel %s which is in state %d is not a DPL channel and has %zu parts still pending.",
1617 channelSpec.name.c_str(), (int)info.state, info.parts.Size());
1618 continue;
1619 }
1620 auto& socket = info.channel->GetSocket();
1621 // If we have pending events from a previous iteration,
1622 // we do receive in any case.
1623 // Otherwise we check if there is any pending event and skip
1624 // this channel in case there is none.
1625 if (info.hasPendingEvents == 0) {
1626 socket.Events(&info.hasPendingEvents);
1627 // If we do not read, we can continue.
1628 if ((info.hasPendingEvents & 1) == 0 && (info.parts.Size() == 0)) {
1629 O2_SIGNPOST_END(device, cid, "channels", "No pending events and no remaining parts to process for channel %{public}s", channelSpec.name.c_str());
1630 continue;
1631 }
1632 }
1633 // We can reset this, because it means we have seen at least 1
1634 // message after the UV_READABLE was raised.
1635 info.readPolled = false;
1636 // Notice that there seems to be a difference between the documentation
1637 // of zeromq and the observed behavior. The fact that ZMQ_POLLIN
1638 // is raised does not mean that a message is immediately available to
1639 // read, just that it will be available soon, so the receive can
1640 // still return -2. To avoid this we keep receiving on the socket until
1641 // we get a message. In order not to overflow the DPL queue we process
1642 // one message at the time and we keep track of wether there were more
1643 // to process.
1644 bool newMessages = false;
1645 while (true) {
1646 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Receiving loop called for channel %{public}s (%d) with oldest possible timeslice %zu",
1647 channelSpec.name.c_str(), info.id.value, info.oldestForChannel.value);
1648 if (info.parts.Size() < 64) {
1649 fair::mq::Parts parts;
1650 info.channel->Receive(parts, 0);
1651 if (parts.Size()) {
1652 O2_SIGNPOST_EVENT_EMIT(device, cid, "channels", "Received %zu parts from channel %{public}s (%d).", parts.Size(), channelSpec.name.c_str(), info.id.value);
1653 }
1654 for (auto&& part : parts) {
1655 info.parts.fParts.emplace_back(std::move(part));
1656 }
1657 newMessages |= true;
1658 }
1659
1660 if (info.parts.Size() >= 0) {
1662 // Receiving data counts as activity now, so that
1663 // We can make sure we process all the pending
1664 // messages without hanging on the uv_run.
1665 break;
1666 }
1667 }
1668 // We check once again for pending events, keeping track if this was the
1669 // case so that we can immediately repeat this loop and avoid remaining
1670 // stuck in uv_run. This is because we will not get notified on the socket
1671 // if more events are pending due to zeromq level triggered approach.
1672 socket.Events(&info.hasPendingEvents);
1673 if (info.hasPendingEvents) {
1674 info.readPolled = false;
1675 // In case there were messages, we consider it as activity
1676 if (newMessages) {
1677 state.lastActiveDataProcessor.store(&context);
1678 }
1679 }
1680 O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
1681 channelSpec.name.c_str(), info.id.value);
1682 }
1683}
1684
1686{
1687 auto& context = ref.get<DataProcessorContext>();
1688 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
1689 auto& state = ref.get<DeviceState>();
1690 auto& spec = ref.get<DeviceSpec const>();
1691
1692 if (state.streaming == StreamingState::Idle) {
1693 return;
1694 }
1695
1696 context.completed.clear();
1697 context.completed.reserve(16);
1698 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1699 state.lastActiveDataProcessor.store(&context);
1700 }
1701 DanglingContext danglingContext{*context.registry};
1702
1703 context.preDanglingCallbacks(danglingContext);
1704 if (state.lastActiveDataProcessor.load() == nullptr) {
1705 ref.get<CallbackService>().call<CallbackService::Id::Idle>();
1706 }
1707 auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
1708 if (activity.expiredSlots > 0) {
1709 state.lastActiveDataProcessor = &context;
1710 }
1711
1712 context.completed.clear();
1713 if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
1714 state.lastActiveDataProcessor = &context;
1715 }
1716
1717 context.postDanglingCallbacks(danglingContext);
1718
1719 // If we got notified that all the sources are done, we call the EndOfStream
1720 // callback and return false. Notice that what happens next is actually
1721 // dependent on the callback, not something which is controlled by the
1722 // framework itself.
1723 if (context.allDone == true && state.streaming == StreamingState::Streaming) {
1725 state.lastActiveDataProcessor = &context;
1726 }
1727
1728 if (state.streaming == StreamingState::EndOfStreaming) {
1729 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
1730 // We keep processing data until we are Idle.
1731 // FIXME: not sure this is the correct way to drain the queues, but
1732 // I guess we will see.
1735 auto& relayer = ref.get<DataRelayer>();
1736
1737 bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false;
1738
1739 while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) {
1740 relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false);
1741 }
1742
1743 auto& timingInfo = ref.get<TimingInfo>();
1744 // We should keep the data generated at end of stream only for those
1745 // which are not sources.
1746 timingInfo.keepAtEndOfStream = shouldProcess;
1747 // Fill timinginfo with some reasonable values for data sent with endOfStream
1748 timingInfo.timeslice = relayer.getOldestPossibleOutput().timeslice.value;
1749 timingInfo.tfCounter = -1;
1750 timingInfo.firstTForbit = -1;
1751 // timingInfo.runNumber = ; // Not sure where to get this if not already set
1752 timingInfo.creation = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
1753 O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);
1754
1755 EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};
1756
1757 context.preEOSCallbacks(eosContext);
1758 auto& streamContext = ref.get<StreamContext>();
1759 streamContext.preEOSCallbacks(eosContext);
1760 ref.get<CallbackService>().call<CallbackService::Id::EndOfStream>(eosContext);
1761 streamContext.postEOSCallbacks(eosContext);
1762 context.postEOSCallbacks(eosContext);
1763
1764 for (auto& channel : spec.outputChannels) {
1765 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Sending end of stream to %{public}s.", channel.name.c_str());
1767 }
1768 // This is needed because the transport is deleted before the device.
1769 relayer.clear();
1771 // In case we should process, note the data processor responsible for it
1772 if (shouldProcess) {
1773 state.lastActiveDataProcessor = &context;
1774 }
1775 // On end of stream we shut down all output pollers.
1776 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1777 for (auto& poller : state.activeOutputPollers) {
1778 uv_poll_stop(poller);
1779 }
1780 return;
1781 }
1782
1783 if (state.streaming == StreamingState::Idle) {
1784 // On end of stream we shut down all output pollers.
1785 O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
1786 for (auto& poller : state.activeOutputPollers) {
1787 uv_poll_stop(poller);
1788 }
1789 }
1790
1791 return;
1792}
1793
1795{
1796 ServiceRegistryRef ref{mServiceRegistry};
1797 ref.get<DataRelayer>().clear();
1798 auto& deviceContext = ref.get<DeviceContext>();
1799 // If the signal handler is there, we should
1800 // hide the registry from it, so that we do not
1801 // end up calling the signal handler on something
1802 // which might not be there anymore.
1803 if (deviceContext.sigusr1Handle) {
1804 deviceContext.sigusr1Handle->data = nullptr;
1805 }
1806 // Makes sure we do not have a working context on
1807 // shutdown.
1808 for (auto& handle : ref.get<DeviceState>().activeSignals) {
1809 handle->data = nullptr;
1810 }
1811}
1812
1815 {
1816 }
1817};
1818
1824{
1825 using InputInfo = DataRelayer::InputInfo;
1827
1828 auto& context = ref.get<DataProcessorContext>();
1829 // This is the same id as the upper level function, so we get the events
1830 // associated with the same interval. We will simply use "handle_data" as
1831 // the category.
1832 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1833
1834 // This is how we validate inputs. I.e. we try to enforce the O2 Data model
1835 // and we do a few stats. We bind parts as a lambda captured variable, rather
1836 // than an input, because we do not want the outer loop actually be exposed
1837 // to the implementation details of the messaging layer.
1838 auto getInputTypes = [&info, &context]() -> std::optional<std::vector<InputInfo>> {
1839 O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
1840 auto ref = ServiceRegistryRef{*context.registry};
1841 auto& stats = ref.get<DataProcessingStats>();
1842 auto& state = ref.get<DeviceState>();
1843 auto& parts = info.parts;
1844 stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});
1845
1846 std::vector<InputInfo> results;
1847 // we can reserve the upper limit
1848 results.reserve(parts.Size() / 2);
1849 size_t nTotalPayloads = 0;
1850
1851 auto insertInputInfo = [&results, &nTotalPayloads](size_t position, size_t length, InputType type, ChannelIndex index) {
1852 results.emplace_back(position, length, type, index);
1853 if (type != InputType::Invalid && length > 1) {
1854 nTotalPayloads += length - 1;
1855 }
1856 };
1857
1858 for (size_t pi = 0; pi < parts.Size(); pi += 2) {
1859 auto* headerData = parts.At(pi)->GetData();
1860 auto sih = o2::header::get<SourceInfoHeader*>(headerData);
1861 if (sih) {
1862 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
1863 info.state = sih->state;
1864 insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
1865 state.lastActiveDataProcessor = &context;
1866 continue;
1867 }
1868 auto dih = o2::header::get<DomainInfoHeader*>(headerData);
1869 if (dih) {
1870 O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
1871 insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
1872 state.lastActiveDataProcessor = &context;
1873 continue;
1874 }
1875 auto dh = o2::header::get<DataHeader*>(headerData);
1876 if (!dh) {
1877 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1878 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header is not a DataHeader?");
1879 continue;
1880 }
1881 if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
1882 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1883 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader payloadSize mismatch");
1884 continue;
1885 }
1886 auto dph = o2::header::get<DataProcessingHeader*>(headerData);
1887 // We only deal with the tracking of parts if the log is enabled.
1888 // This is because in principle we should track the size of each of
1889 // the parts and sum it up. Not for now.
1890 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, headerData);
1891 O2_SIGNPOST_START(parts, pid, "parts", "Processing DataHeader %{public}-4s/%{public}-16s/%d with splitPayloadParts %d and splitPayloadIndex %d",
1892 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->splitPayloadParts, dh->splitPayloadIndex);
1893 if (!dph) {
1894 insertInputInfo(pi, 2, InputType::Invalid, info.id);
1895 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "Header stack does not contain DataProcessingHeader");
1896 continue;
1897 }
1898 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
1899 // this is indicating a sequence of payloads following the header
1900 // FIXME: we will probably also set the DataHeader version
1901 insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data, info.id);
1902 pi += dh->splitPayloadParts - 1;
1903 } else {
1904 // We can set the type for the next splitPayloadParts
1905 // because we are guaranteed they are all the same.
1906 // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
1907 // pair.
1908 size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
1909 if (finalSplitPayloadIndex > parts.Size()) {
1910 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "DataHeader::splitPayloadParts invalid");
1911 insertInputInfo(pi, 0, InputType::Invalid, info.id);
1912 continue;
1913 }
1914 insertInputInfo(pi, 2, InputType::Data, info.id);
1915 for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
1916 insertInputInfo(pi + 2, 2, InputType::Data, info.id);
1917 }
1918 }
1919 }
1920 if (results.size() + nTotalPayloads != parts.Size()) {
1921 O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "handle_data", "inconsistent number of inputs extracted. %zu vs parts (%zu)", results.size() + nTotalPayloads, parts.Size());
1922 return std::nullopt;
1923 }
1924 return results;
1925 };
1926
1927 auto reportError = [ref](const char* message) {
1928 auto& stats = ref.get<DataProcessingStats>();
1930 };
1931
1932 auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
1933 auto& relayer = ref.get<DataRelayer>();
1934 auto& state = ref.get<DeviceState>();
1935 static WaitBackpressurePolicy policy;
1936 auto& parts = info.parts;
1937 // We relay execution to make sure we have a complete set of parts
1938 // available.
1939 bool hasBackpressure = false;
1940 size_t minBackpressureTimeslice = -1;
1941 bool hasData = false;
1942 size_t oldestPossibleTimeslice = -1;
1943 static std::vector<int> ordering;
1944 // Same as inputInfos but with iota.
1945 ordering.resize(inputInfos.size());
1946 std::iota(ordering.begin(), ordering.end(), 0);
1947 // stable sort orderings by type and position
1948 std::stable_sort(ordering.begin(), ordering.end(), [&inputInfos](int const& a, int const& b) {
1949 auto const& ai = inputInfos[a];
1950 auto const& bi = inputInfos[b];
1951 if (ai.type != bi.type) {
1952 return ai.type < bi.type;
1953 }
1954 return ai.position < bi.position;
1955 });
1956 for (size_t ii = 0; ii < inputInfos.size(); ++ii) {
1957 auto const& input = inputInfos[ordering[ii]];
1958 switch (input.type) {
1959 case InputType::Data: {
1960 hasData = true;
1961 auto headerIndex = input.position;
1962 auto nMessages = 0;
1963 auto nPayloadsPerHeader = 0;
1964 if (input.size > 2) {
1965 // header and multiple payload sequence
1966 nMessages = input.size;
1967 nPayloadsPerHeader = nMessages - 1;
1968 } else {
1969 // multiple header-payload pairs
1970 auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
1971 nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
1972 nPayloadsPerHeader = 1;
1973 ii += (nMessages / 2) - 1;
1974 }
1975 auto onDrop = [ref](TimesliceSlot slot, std::vector<MessageSet>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1976 O2_SIGNPOST_ID_GENERATE(cid, async_queue);
1977 O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
1978 slot.index, oldestOutputInfo.timeslice.value);
1979 ref.get<AsyncQueue>();
1980 ref.get<DecongestionService>();
1981 ref.get<DataRelayer>();
1982 // Get the current timeslice for the slot.
1983 auto& variables = ref.get<TimesliceIndex>().getVariablesForSlot(slot);
1985 forwardInputs(ref, slot, dropped, oldestOutputInfo, false, true);
1986 };
1987 auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
1988 &parts.At(headerIndex),
1989 input,
1990 nMessages,
1991 nPayloadsPerHeader,
1992 onDrop);
1993 switch (relayed.type) {
1995 if (info.normalOpsNotified == true && info.backpressureNotified == false) {
1996 LOGP(alarm, "Backpressure on channel {}. Waiting.", info.channel->GetName());
1997 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
1998 monitoring.send(o2::monitoring::Metric{1, fmt::format("backpressure_{}", info.channel->GetName())});
1999 info.backpressureNotified = true;
2000 info.normalOpsNotified = false;
2001 }
2002 policy.backpressure(info);
2003 hasBackpressure = true;
2004 minBackpressureTimeslice = std::min<size_t>(minBackpressureTimeslice, relayed.timeslice.value);
2005 break;
2009 if (info.normalOpsNotified == false && info.backpressureNotified == true) {
2010 LOGP(info, "Back to normal on channel {}.", info.channel->GetName());
2011 auto& monitoring = ref.get<o2::monitoring::Monitoring>();
2012 monitoring.send(o2::monitoring::Metric{0, fmt::format("backpressure_{}", info.channel->GetName())});
2013 info.normalOpsNotified = true;
2014 info.backpressureNotified = false;
2015 }
2016 break;
2017 }
2018 } break;
2019 case InputType::SourceInfo: {
2020 LOGP(detail, "Received SourceInfo");
2021 auto& context = ref.get<DataProcessorContext>();
2022 state.lastActiveDataProcessor = &context;
2023 auto headerIndex = input.position;
2024 auto payloadIndex = input.position + 1;
2025 assert(payloadIndex < parts.Size());
2026 // FIXME: the message with the end of stream cannot contain
2027 // split parts.
2028 parts.At(headerIndex).reset(nullptr);
2029 parts.At(payloadIndex).reset(nullptr);
2030 // for (size_t i = 0; i < dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 - 1 : 1; ++i) {
2031 // parts.At(headerIndex + 1 + i).reset(nullptr);
2032 // }
2033 // pi += dh->splitPayloadParts > 0 ? dh->splitPayloadParts - 1 : 0;
2034
2035 } break;
2036 case InputType::DomainInfo: {
2039 auto& context = ref.get<DataProcessorContext>();
2040 state.lastActiveDataProcessor = &context;
2041 auto headerIndex = input.position;
2042 auto payloadIndex = input.position + 1;
2043 assert(payloadIndex < parts.Size());
2044 // FIXME: the message with the end of stream cannot contain
2045 // split parts.
2046
2047 auto dih = o2::header::get<DomainInfoHeader*>(parts.At(headerIndex)->GetData());
2048 if (hasBackpressure && dih->oldestPossibleTimeslice >= minBackpressureTimeslice) {
2049 break;
2050 }
2051 oldestPossibleTimeslice = std::min(oldestPossibleTimeslice, dih->oldestPossibleTimeslice);
2052 LOGP(debug, "Got DomainInfoHeader, new oldestPossibleTimeslice {} on channel {}", oldestPossibleTimeslice, info.id.value);
2053 parts.At(headerIndex).reset(nullptr);
2054 parts.At(payloadIndex).reset(nullptr);
2055 } break;
2056 case InputType::Invalid: {
2057 reportError("Invalid part found.");
2058 } break;
2059 }
2060 }
2063 if (oldestPossibleTimeslice != (size_t)-1) {
2064 info.oldestForChannel = {oldestPossibleTimeslice};
2065 auto& context = ref.get<DataProcessorContext>();
2066 context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
2067 ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
2068 state.lastActiveDataProcessor = &context;
2069 }
2070 auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
2071 parts.fParts.erase(it, parts.end());
2072 if (parts.fParts.size()) {
2073 LOG(debug) << parts.fParts.size() << " messages backpressured";
2074 }
2075 };
2076
2077 // Second part. This is the actual outer loop we want to obtain, with
2078 // implementation details which can be read. Notice how most of the state
2079 // is actually hidden. For example we do not expose what "input" is. This
2080 // will allow us to keep the same toplevel logic even if the actual meaning
2081 // of input is changed (for example we might move away from multipart
2082 // messages). Notice also that we need to act diffently depending on the
2083 // actual CompletionOp we want to perform. In particular forwarding inputs
2084 // also gets rid of them from the cache.
2085 auto inputTypes = getInputTypes();
2086 if (bool(inputTypes) == false) {
2087 reportError("Parts should come in couples. Dropping it.");
2088 return;
2089 }
2090 handleValidMessages(*inputTypes);
2091 return;
2092}
2093
2094namespace
2095{
2096struct InputLatency {
2097 uint64_t minLatency = std::numeric_limits<uint64_t>::max();
2098 uint64_t maxLatency = std::numeric_limits<uint64_t>::min();
2099};
2100
2101auto calculateInputRecordLatency(InputRecord const& record, uint64_t currentTime) -> InputLatency
2102{
2103 InputLatency result;
2104
2105 for (auto& item : record) {
2106 auto* header = o2::header::get<DataProcessingHeader*>(item.header);
2107 if (header == nullptr) {
2108 continue;
2109 }
2110 int64_t partLatency = (0x7fffffffffffffff & currentTime) - (0x7fffffffffffffff & header->creation);
2111 if (partLatency < 0) {
2112 partLatency = 0;
2113 }
2114 result.minLatency = std::min(result.minLatency, (uint64_t)partLatency);
2115 result.maxLatency = std::max(result.maxLatency, (uint64_t)partLatency);
2116 }
2117 return result;
2118};
2119
2120auto calculateTotalInputRecordSize(InputRecord const& record) -> int
2121{
2122 size_t totalInputSize = 0;
2123 for (auto& item : record) {
2124 auto* header = o2::header::get<DataHeader*>(item.header);
2125 if (header == nullptr) {
2126 continue;
2127 }
2128 totalInputSize += header->payloadSize;
2129 }
2130 return totalInputSize;
2131};
2132
2133template <typename T>
2134void update_maximum(std::atomic<T>& maximum_value, T const& value) noexcept
2135{
2136 T prev_value = maximum_value;
2137 while (prev_value < value &&
2138 !maximum_value.compare_exchange_weak(prev_value, value)) {
2139 }
2140}
2141} // namespace
2142
2143bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed)
2144{
2145 auto& context = ref.get<DataProcessorContext>();
2146 LOGP(debug, "DataProcessingDevice::tryDispatchComputation");
2147 // This is the actual hidden state for the outer loop. In case we decide we
2148 // want to support multithreaded dispatching of operations, I can simply
2149 // move these to some thread local store and the rest of the lambdas
2150 // should work just fine.
2151 std::vector<MessageSet> currentSetOfInputs;
2152
2153 //
2154 auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2155 auto& relayer = ref.get<DataRelayer>();
2156 if (consume) {
2157 currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
2158 } else {
2159 currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2160 }
2161 auto getter = [&currentSetOfInputs](size_t i, size_t partindex) -> DataRef {
2162 if (currentSetOfInputs[i].getNumberOfPairs() > partindex) {
2163 const char* headerptr = nullptr;
2164 const char* payloadptr = nullptr;
2165 size_t payloadSize = 0;
2166 // - each input can have multiple parts
2167 // - "part" denotes a sequence of messages belonging together, the first message of the
2168 // sequence is the header message
2169 // - each part has one or more payload messages
2170 // - InputRecord provides all payloads as header-payload pair
2171 auto const& headerMsg = currentSetOfInputs[i].associatedHeader(partindex);
2172 auto const& payloadMsg = currentSetOfInputs[i].associatedPayload(partindex);
2173 headerptr = static_cast<char const*>(headerMsg->GetData());
2174 payloadptr = payloadMsg ? static_cast<char const*>(payloadMsg->GetData()) : nullptr;
2175 payloadSize = payloadMsg ? payloadMsg->GetSize() : 0;
2176 return DataRef{nullptr, headerptr, payloadptr, payloadSize};
2177 }
2178 return DataRef{};
2179 };
2180 auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
2181 return currentSetOfInputs[i].getNumberOfPairs();
2182 };
2183#if __has_include(<fairmq/shmem/Message.h>)
2184 auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2185 auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2186 return header.GetRefCount();
2187 };
2188#else
2189 std::function<int(size_t)> refCountGetter = nullptr;
2190#endif
2191 return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
2192 };
2193
2194 auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {
2195 auto& relayer = ref.get<DataRelayer>();
2197 };
2198
2199 // I need a preparation step which gets the current timeslice id and
2200 // propagates it to the various contextes (i.e. the actual entities which
2201 // create messages) because the messages need to have the timeslice id into
2202 // it.
2203 auto prepareAllocatorForCurrentTimeSlice = [ref](TimesliceSlot i) -> void {
2204 auto& relayer = ref.get<DataRelayer>();
2205 auto& timingInfo = ref.get<TimingInfo>();
2206 auto timeslice = relayer.getTimesliceForSlot(i);
2207
2208 timingInfo.timeslice = timeslice.value;
2209 timingInfo.tfCounter = relayer.getFirstTFCounterForSlot(i);
2210 timingInfo.firstTForbit = relayer.getFirstTFOrbitForSlot(i);
2211 timingInfo.runNumber = relayer.getRunNumberForSlot(i);
2212 timingInfo.creation = relayer.getCreationTimeForSlot(i);
2213 };
2214 auto updateRunInformation = [ref](TimesliceSlot i) -> void {
2215 auto& dataProcessorContext = ref.get<DataProcessorContext>();
2216 auto& relayer = ref.get<DataRelayer>();
2217 auto& timingInfo = ref.get<TimingInfo>();
2218 auto timeslice = relayer.getTimesliceForSlot(i);
2219 // We report wether or not this timing info refers to a new Run.
2220 timingInfo.globalRunNumberChanged = !TimingInfo::timesliceIsTimer(timeslice.value) && dataProcessorContext.lastRunNumberProcessed != timingInfo.runNumber;
2221 // A switch to runNumber=0 should not appear and thus does not set globalRunNumberChanged, unless it is seen in the first processed timeslice
2222 timingInfo.globalRunNumberChanged &= (dataProcessorContext.lastRunNumberProcessed == -1 || timingInfo.runNumber != 0);
2223 // FIXME: for now there is only one stream, however we
2224 // should calculate this correctly once we finally get the
2225 // the StreamContext in.
2226 timingInfo.streamRunNumberChanged = timingInfo.globalRunNumberChanged;
2227 };
2228
2229 // When processing them, timers will have to be cleaned up
2230 // to avoid double counting them.
2231 // This was actually the easiest solution we could find for
2232 // O2-646.
2233 auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2234 assert(record.size() == currentSetOfInputs.size());
2235 for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
2236 // assuming that for timer inputs we do have exactly one PartRef object
2237 // in the MessageSet, multiple PartRef Objects are only possible for either
2238 // split payload messages of wildcard matchers, both for data inputs
2239 DataRef input = record.getByPos(ii);
2240 if (input.spec->lifetime != Lifetime::Timer) {
2241 continue;
2242 }
2243 if (input.header == nullptr) {
2244 continue;
2245 }
2246 // This will hopefully delete the message.
2247 currentSetOfInputs[ii].clear();
2248 }
2249 };
2250
2251 // Function to cleanup record. For the moment we
2252 // simply use it to keep track of input messages
2253 // which are not needed, to display them in the GUI.
2254 auto cleanupRecord = [](InputRecord& record) {
2255 if (O2_LOG_ENABLED(parts) == false) {
2256 return;
2257 }
2258 for (size_t pi = 0, pe = record.size(); pi < pe; ++pi) {
2259 DataRef input = record.getByPos(pi);
2260 if (input.header == nullptr) {
2261 continue;
2262 }
2263 auto sih = o2::header::get<SourceInfoHeader*>(input.header);
2264 if (sih) {
2265 continue;
2266 }
2267
2268 auto dh = o2::header::get<DataHeader*>(input.header);
2269 if (!dh) {
2270 continue;
2271 }
2272 // We use the address of the first header of a split payload
2273 // to identify the interval.
2274 O2_SIGNPOST_ID_FROM_POINTER(pid, parts, dh);
2275 O2_SIGNPOST_END(parts, pid, "parts", "Cleaning up parts associated to %p", dh);
2276
2277 // No split parts, we simply skip the payload
2278 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
2279 // this is indicating a sequence of payloads following the header
2280 // FIXME: we will probably also set the DataHeader version
2281 pi += dh->splitPayloadParts - 1;
2282 } else {
2283 size_t pi = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
2284 }
2285 }
2286 };
2287
2288 ref.get<DataRelayer>().getReadyToProcess(completed);
2289 if (completed.empty() == true) {
2290 LOGP(debug, "No computations available for dispatching.");
2291 return false;
2292 }
2293
2294 auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2295 auto& stats = ref.get<DataProcessingStats>();
2296 auto& states = ref.get<DataProcessingStates>();
2297 std::atomic_thread_fence(std::memory_order_release);
2298 char relayerSlotState[1024];
2299 int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2300 char* buffer = relayerSlotState + written;
2301 for (size_t ai = 0; ai != record.size(); ai++) {
2302 buffer[ai] = record.isValid(ai) ? '3' : '0';
2303 }
2304 buffer[record.size()] = 0;
2305 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index),
2306 .size = (int)(record.size() + buffer - relayerSlotState),
2307 .data = relayerSlotState});
2308 uint64_t tEnd = uv_hrtime();
2309 // tEnd and tStart are in nanoseconds according to https://docs.libuv.org/en/v1.x/misc.html#c.uv_hrtime
2310 int64_t wallTimeMs = (tEnd - tStart) / 1000000;
2312 // Sum up the total wall time, in milliseconds.
2314 // The time interval is in seconds while tEnd - tStart is in nanoseconds, so we divide by 1000000 to get the fraction in ms/s.
2316 stats.updateStats({(int)ProcessingStatsId::LAST_PROCESSED_SIZE, DataProcessingStats::Op::Set, calculateTotalInputRecordSize(record)});
2317 stats.updateStats({(int)ProcessingStatsId::TOTAL_PROCESSED_SIZE, DataProcessingStats::Op::Add, calculateTotalInputRecordSize(record)});
2318 auto latency = calculateInputRecordLatency(record, tStartMilli);
2319 stats.updateStats({(int)ProcessingStatsId::LAST_MIN_LATENCY, DataProcessingStats::Op::Set, (int)latency.minLatency});
2320 stats.updateStats({(int)ProcessingStatsId::LAST_MAX_LATENCY, DataProcessingStats::Op::Set, (int)latency.maxLatency});
2321 static int count = 0;
2323 count++;
2324 };
2325
2326 auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2327 auto& states = ref.get<DataProcessingStates>();
2328 std::atomic_thread_fence(std::memory_order_release);
2329 char relayerSlotState[1024];
2330 snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2331 char* buffer = strchr(relayerSlotState, ' ') + 1;
2332 for (size_t ai = 0; ai != record.size(); ai++) {
2333 buffer[ai] = record.isValid(ai) ? '2' : '0';
2334 }
2335 buffer[record.size()] = 0;
2336 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + action.slot.index), .size = (int)(record.size() + buffer - relayerSlotState), .data = relayerSlotState});
2337 };
2338
2339 // This is the main dispatching loop
2340 auto& state = ref.get<DeviceState>();
2341 auto& spec = ref.get<DeviceSpec const>();
2342
2343 auto& dpContext = ref.get<DataProcessorContext>();
2344 auto& streamContext = ref.get<StreamContext>();
2345 O2_SIGNPOST_ID_GENERATE(sid, device);
2346 O2_SIGNPOST_START(device, sid, "device", "Start processing ready actions");
2347
2348 auto& stats = ref.get<DataProcessingStats>();
2349 auto& relayer = ref.get<DataRelayer>();
2350 using namespace o2::framework;
2351 stats.updateStats({(int)ProcessingStatsId::PENDING_INPUTS, DataProcessingStats::Op::Set, static_cast<int64_t>(relayer.getParallelTimeslices() - completed.size())});
2352 stats.updateStats({(int)ProcessingStatsId::INCOMPLETE_INPUTS, DataProcessingStats::Op::Set, completed.empty() ? 1 : 0});
2353 switch (spec.completionPolicy.order) {
2355 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.timeslice.value < b.timeslice.value; });
2356 break;
2358 std::sort(completed.begin(), completed.end(), [](auto const& a, auto const& b) { return a.slot.index < b.slot.index; });
2359 break;
2361 default:
2362 break;
2363 }
2364
2365 for (auto action : completed) {
2366 O2_SIGNPOST_ID_GENERATE(aid, device);
2367 O2_SIGNPOST_START(device, aid, "device", "Processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2369 O2_SIGNPOST_END(device, aid, "device", "Waiting for more data.");
2370 continue;
2371 }
2372
2373 bool shouldConsume = action.op == CompletionPolicy::CompletionOp::Consume ||
2375 prepareAllocatorForCurrentTimeSlice(TimesliceSlot{action.slot});
2379 updateRunInformation(TimesliceSlot{action.slot});
2380 }
2381 InputSpan span = getInputSpan(action.slot, shouldConsume);
2382 auto& spec = ref.get<DeviceSpec const>();
2383 InputRecord record{spec.inputs,
2384 span,
2385 *context.registry};
2386 ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
2387 {
2388 // Notice this should be thread safe and reentrant
2389 // as it is called from many threads.
2390 streamContext.preProcessingCallbacks(processContext);
2391 dpContext.preProcessingCallbacks(processContext);
2392 }
2394 context.postDispatchingCallbacks(processContext);
2395 if (spec.forwards.empty() == false) {
2396 auto& timesliceIndex = ref.get<TimesliceIndex>();
2397 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2398 O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
2399 continue;
2400 }
2401 }
2402 // If there is no optional inputs we canForwardEarly
2403 // the messages to that parallel processing can happen.
2404 // In this case we pass true to indicate that we want to
2405 // copy the messages to the subsequent data processor.
2406 bool hasForwards = spec.forwards.empty() == false;
2408
2409 if (context.canForwardEarly && hasForwards && consumeSomething) {
2410 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Early forwainding: %{public}s.", fmt::format("{}", action.op).c_str());
2411 auto& timesliceIndex = ref.get<TimesliceIndex>();
2412 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), true, action.op == CompletionPolicy::CompletionOp::Consume);
2413 }
2414 markInputsAsDone(action.slot);
2415
2416 uint64_t tStart = uv_hrtime();
2417 uint64_t tStartMilli = TimingHelpers::getRealtimeSinceEpochStandalone();
2418 preUpdateStats(action, record, tStart);
2419
2420 static bool noCatch = getenv("O2_NO_CATCHALL_EXCEPTIONS") && strcmp(getenv("O2_NO_CATCHALL_EXCEPTIONS"), "0");
2421
2422 auto runNoCatch = [&context, ref, &processContext](DataRelayer::RecordAction& action) mutable {
2423 auto& state = ref.get<DeviceState>();
2424 auto& spec = ref.get<DeviceSpec const>();
2425 auto& streamContext = ref.get<StreamContext>();
2426 auto& dpContext = ref.get<DataProcessorContext>();
2427 auto shouldProcess = [](DataRelayer::RecordAction& action) -> bool {
2428 switch (action.op) {
2433 return true;
2434 break;
2435 default:
2436 return false;
2437 }
2438 };
2439 if (state.quitRequested == false) {
2440 {
2441 // Callbacks from services
2442 dpContext.preProcessingCallbacks(processContext);
2443 streamContext.preProcessingCallbacks(processContext);
2444 dpContext.preProcessingCallbacks(processContext);
2445 // Callbacks from users
2446 ref.get<CallbackService>().call<CallbackService::Id::PreProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2447 }
2448 O2_SIGNPOST_ID_FROM_POINTER(pcid, device, &processContext);
2449 if (context.statefulProcess && shouldProcess(action)) {
2450 // This way, usercode can use the the same processing context to identify
2451 // its signposts and we can map user code to device iterations.
2452 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2453 (context.statefulProcess)(processContext);
2454 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2455 } else if (context.statelessProcess && shouldProcess(action)) {
2456 O2_SIGNPOST_START(device, pcid, "device", "Stateful process");
2457 (context.statelessProcess)(processContext);
2458 O2_SIGNPOST_END(device, pcid, "device", "Stateful process");
2459 } else if (context.statelessProcess || context.statefulProcess) {
2460 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Skipping processing because we are discarding.");
2461 } else {
2462 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "No processing callback provided. Switching to %{public}s.", "Idle");
2464 }
2465 if (shouldProcess(action)) {
2466 auto& timingInfo = ref.get<TimingInfo>();
2467 if (timingInfo.globalRunNumberChanged) {
2468 context.lastRunNumberProcessed = timingInfo.runNumber;
2469 }
2470 }
2471
2472 // Notify the sink we just consumed some timeframe data
2473 if (context.isSink && action.op == CompletionPolicy::CompletionOp::Consume) {
2474 O2_SIGNPOST_EVENT_EMIT(device, pcid, "device", "Sending dpl-summary");
2475 auto& allocator = ref.get<DataAllocator>();
2476 allocator.make<int>(OutputRef{"dpl-summary", runtime_hash(spec.name.c_str())}, 1);
2477 }
2478
2479 // Extra callback which allows a service to add extra outputs.
2480 // This is needed e.g. to ensure that injected CCDB outputs are added
2481 // before an end of stream.
2482 {
2483 ref.get<CallbackService>().call<CallbackService::Id::FinaliseOutputs>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2484 dpContext.finaliseOutputsCallbacks(processContext);
2485 streamContext.finaliseOutputsCallbacks(processContext);
2486 }
2487
2488 {
2489 ref.get<CallbackService>().call<CallbackService::Id::PostProcessing>(o2::framework::ServiceRegistryRef{ref}, (int)action.op);
2490 dpContext.postProcessingCallbacks(processContext);
2491 streamContext.postProcessingCallbacks(processContext);
2492 }
2493 }
2494 };
2495
2496 if ((state.tracingFlags & DeviceState::LoopReason::TRACE_USERCODE) != 0) {
2497 state.severityStack.push_back((int)fair::Logger::GetConsoleSeverity());
2498 fair::Logger::SetConsoleSeverity(fair::Severity::trace);
2499 }
2500 if (noCatch) {
2501 try {
2502 runNoCatch(action);
2503 } catch (o2::framework::RuntimeErrorRef e) {
2504 (context.errorHandling)(e, record);
2505 }
2506 } else {
2507 try {
2508 runNoCatch(action);
2509 } catch (std::exception& ex) {
2513 auto e = runtime_error(ex.what());
2514 (context.errorHandling)(e, record);
2515 } catch (o2::framework::RuntimeErrorRef e) {
2516 (context.errorHandling)(e, record);
2517 }
2518 }
2519 if (state.severityStack.empty() == false) {
2520 fair::Logger::SetConsoleSeverity((fair::Severity)state.severityStack.back());
2521 state.severityStack.pop_back();
2522 }
2523
2524 postUpdateStats(action, record, tStart, tStartMilli);
2525 // We forward inputs only when we consume them. If we simply Process them,
2526 // we keep them for next message arriving.
2528 cleanupRecord(record);
2529 context.postDispatchingCallbacks(processContext);
2530 ref.get<CallbackService>().call<CallbackService::Id::DataConsumed>(o2::framework::ServiceRegistryRef{ref});
2531 }
2532 if ((context.canForwardEarly == false) && hasForwards && consumeSomething) {
2533 O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Late forwarding");
2534 auto& timesliceIndex = ref.get<TimesliceIndex>();
2535 forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false, action.op == CompletionPolicy::CompletionOp::Consume);
2536 }
2537 context.postForwardingCallbacks(processContext);
2539 cleanTimers(action.slot, record);
2540 }
2541 O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
2542 }
2543 O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
2544
2545 // We now broadcast the end of stream if it was requested
2546 if (state.streaming == StreamingState::EndOfStreaming) {
2547 LOGP(detail, "Broadcasting end of stream");
2548 for (auto& channel : spec.outputChannels) {
2550 }
2552 }
2553
2554 return true;
2555}
2556
2558{
2559 LOG(error) << msg;
2560 ServiceRegistryRef ref{mServiceRegistry};
2561 auto& stats = ref.get<DataProcessingStats>();
2563}
2564
2565std::unique_ptr<ConfigParamStore> DeviceConfigurationHelpers::getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options)
2566{
2567
2568 if (registry.active<ConfigurationInterface>()) {
2569 auto& cfg = registry.get<ConfigurationInterface>();
2570 try {
2571 cfg.getRecursive(name);
2572 std::vector<std::unique_ptr<ParamRetriever>> retrievers;
2573 retrievers.emplace_back(std::make_unique<ConfigurationOptionsRetriever>(&cfg, name));
2574 auto configStore = std::make_unique<ConfigParamStore>(options, std::move(retrievers));
2575 configStore->preload();
2576 configStore->activate();
2577 return configStore;
2578 } catch (...) {
2579 // No overrides...
2580 }
2581 }
2582 return {nullptr};
2583}
2584
2585} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
struct uv_signal_s uv_signal_t
struct uv_async_s uv_async_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
o2::monitoring::Metric Metric
uint64_t maxLatency
o2::configuration::ConfigurationInterface ConfigurationInterface
constexpr int DEFAULT_MAX_CHANNEL_AHEAD
uint64_t minLatency
std::ostringstream debug
int32_t i
std::enable_if_t< std::is_signed< T >::value, bool > hasData(const CalArray< T > &cal)
Definition Painter.cxx:599
uint16_t pid
Definition RawData.h:2
uint32_t res
Definition RawData.h:0
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_LOG_ENABLED(log)
Definition Signpost.h:110
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
o2::monitoring::Monitoring Monitoring
StringRef key
@ DeviceStateChanged
Invoked the device undergoes a state change.
decltype(auto) make(const Output &spec, Args... args)
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry &)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
A service API to communicate with the driver.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
bool active() const
Check if service of type T is currently active.
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
ServiceKind
The kind of service we are asking for.
void on_idle_timer(uv_timer_t *handle)
@ DPL
The channel is a normal input channel.
void run_completion(uv_work_t *handle, int status)
void on_socket_polled(uv_poll_t *poller, int status, int events)
void run_callback(uv_work_t *handle)
volatile int region_read_global_dummy_variable
void handleRegionCallbacks(ServiceRegistryRef registry, std::vector< fair::mq::RegionInfo > &infos)
Invoke the callbacks for the mPendingRegionInfos.
void on_out_of_band_polled(uv_poll_t *poller, int status, int events)
DeviceSpec const & getRunningDevice(RunningDeviceRef const &running, ServiceRegistryRef const &services)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
void on_communication_requested(uv_async_t *s)
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
RuntimeError & error_from_ref(RuntimeErrorRef)
void on_awake_main_thread(uv_async_t *handle)
@ Completed
The channel was signaled it will not receive any data.
@ Running
The channel is actively receiving data.
void on_signal_callback(uv_signal_t *handle, int signum)
@ Me
Only quit this data processor.
constexpr const char * channelName(int channel)
Definition Constants.h:318
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
static void run(AsyncQueue &queue, TimesliceId oldestPossibleTimeslice)
static void post(AsyncQueue &queue, AsyncTask const &task)
An actuatual task to be executed.
Definition AsyncQueue.h:32
static void demangled_backtrace_symbols(void **backtrace, unsigned int total, int fd)
static constexpr int INVALID
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
int64_t timeslices
How many timeslices it can process without giving back control.
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
Helper struct to hold statistics about the data processing happening.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Add
Update the rate of the metric given the amount since the last time.
std::function< void(o2::framework::RuntimeErrorRef e, InputRecord &record)> errorHandling
AlgorithmSpec::InitErrorCallback initError
void preLoopCallbacks(ServiceRegistryRef)
Invoke callbacks before we enter the event loop.
void postStopCallbacks(ServiceRegistryRef)
Invoke callbacks on stop.
void preProcessingCallbacks(ProcessingContext &)
Invoke callbacks to be executed before every process method invokation.
bool canForwardEarly
Wether or not the associated DataProcessor can forward things early.
void preStartCallbacks(ServiceRegistryRef)
Invoke callbacks to be executed in PreRun(), before the User Start callbacks.
AlgorithmSpec::ProcessCallback statefulProcess
const char * header
Definition DataRef.h:27
const InputSpec * spec
Definition DataRef.h:26
static std::vector< size_t > createDistinctRouteIndex(std::vector< InputRoute > const &)
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static header::DataOrigin asConcreteOrigin(InputSpec const &spec)
TimesliceIndex::OldestOutputInfo oldestTimeslice
static unsigned int pipelineLength()
get max number of timeslices in the queue
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
ProcessingPolicies & processingPolicies
Running state information of a given device.
Definition DeviceState.h:34
std::atomic< int64_t > cleanupCount
Definition DeviceState.h:82
Forward channel information.
Definition ChannelInfo.h:88
ChannelAccountingType channelType
Wether or not it's a DPL internal channel.
Definition ChannelInfo.h:92
std::string name
The name of the channel.
Definition ChannelInfo.h:90
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
fair::mq::Channel * channel
Definition ChannelInfo.h:51
TimesliceId oldestForChannel
Oldest possible timeslice for the given channel.
Definition ChannelInfo.h:65
enum Lifetime lifetime
Definition InputSpec.h:73
enum EarlyForwardPolicy earlyForward
Information about the running workflow.
static constexpr ServiceKind kind
static Salt streamSalt(short streamId, short dataProcessorId)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
static Salt globalStreamSalt(short streamId)
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
void finaliseOutputsCallbacks(ProcessingContext &)
Invoke callbacks to be executed after every process method invokation.
void preProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed before every process method invokation.
void preEOSCallbacks(EndOfStreamContext &eosContext)
Invoke callbacks to be executed before every EOS user callback invokation.
void postProcessingCallbacks(ProcessingContext &pcx)
Invoke callbacks to be executed after every process method invokation.
static int64_t getRealtimeSinceEpochStandalone()
bool keepAtEndOfStream
Wether this kind of data should be flushed during end of stream.
Definition TimingInfo.h:44
static bool timesliceIsTimer(size_t timeslice)
Definition TimingInfo.h:46
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
void backpressure(InputChannelInfo const &)
locked_execution(ServiceRegistryRef &ref_)
the main header struct
Definition DataHeader.h:619
constexpr size_t min
constexpr size_t max
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
vec clear()
const std::string str
uint64_t const void const *restrict const msg
Definition x9.h:153