Project
Loading...
Searching...
No Matches
DataProcessingHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
18#include "Headers/DataHeader.h"
20#include "Headers/Stack.h"
21#include "Framework/Logger.h"
27#include "Framework/Signpost.h"
35#include "Headers/DataHeader.h"
37#include "DecongestionService.h"
38
39#include <fairmq/Device.h>
40#include <fairmq/Channel.h>
41
42#include <uv.h>
43
44// A log to use for general device logging
46// Stream which keeps track of the calibration lifetime logic
49
50namespace o2::framework
51{
53{
54 fair::mq::Device* device = ref.get<RawDeviceService>().device();
55 fair::mq::Parts parts;
56 fair::mq::MessagePtr payload(device->NewMessage());
59 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.name, 0).Transport());
60 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
61 // sigh... See if we can avoid having it const by not
62 // exposing it to the user in the first place.
63 parts.AddPart(std::move(header));
64 parts.AddPart(std::move(payload));
65 device->Send(parts, channel.name, 0);
66 LOGP(info, "Sending end-of-stream message to channel {}", channel.name);
67}
68
69void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory* transport, ChannelIndex index, SendingPolicy::SendingCallback const& callback, size_t timeslice)
70{
71 fair::mq::Parts parts;
72 fair::mq::MessagePtr payload(transport->CreateMessage());
74 dih.oldestPossibleTimeslice = timeslice;
75 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
76 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
77 // sigh... See if we can avoid having it const by not
78 // exposing it to the user in the first place.
79 parts.AddPart(std::move(header));
80 parts.AddPart(std::move(payload));
81
82 callback(parts, index, ref);
83}
84
86{
88 return false;
89 }
90 if (state.oldestForChannel.value >= timeslice) {
91 return false;
92 }
93 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->forward, timeslice);
94 state.oldestForChannel = {timeslice};
95 return true;
96}
97
99{
101 return false;
102 }
103 if (state.oldestForChannel.value >= timeslice) {
104 return false;
105 }
106 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->send, timeslice);
107 state.oldestForChannel = {timeslice};
108 return true;
109}
110
112{
113 auto& proxy = ref.get<FairMQDeviceProxy>();
114 for (int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
115 auto& info = proxy.getOutputChannelInfo({ci});
116 auto& state = proxy.getOutputChannelState({ci});
117 sendOldestPossibleTimeframe(ref, info, state, timeslice);
118 }
119}
120
122{
123 auto& state = ref.get<DeviceState>();
124 auto& context = ref.get<DataProcessorContext>();
125 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
126 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
127 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
128 state.streaming = newState;
129 ref.get<ControlService>().notifyStreamingState(state.streaming);
130};
131
132bool hasOnlyTimers(DeviceSpec const& spec)
133{
134 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
135}
136
138{
139 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
140}
141
143{
144 auto* ref = (ServiceRegistryRef*)handle->data;
145 auto& state = ref->get<DeviceState>();
146 auto& spec = ref->get<DeviceSpec const>();
147 state.loopReason |= DeviceState::TIMER_EXPIRED;
148
149 // Check if this is a source device
150 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
151
153 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
155 } else {
156 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
157 state.allowedProcessing = DeviceState::CalibrationOnly;
158 }
159}
160
162{
163 auto* ref = (ServiceRegistryRef*)handle->data;
164 auto& state = ref->get<DeviceState>();
165 state.loopReason |= DeviceState::TIMER_EXPIRED;
166 // Check if this is a source device
167 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
168 auto& spec = ref->get<DeviceSpec const>();
169 std::string messageOnExpire = DataProcessingHelpers::hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
170 if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
171 O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
172 } else {
173 O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
174 }
175 state.transitionHandling = TransitionHandlingState::Expired;
176}
177
179{
180 auto& state = ref.get<DeviceState>();
181 auto& deviceProxy = ref.get<FairMQDeviceProxy>();
182 if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
183 return state.transitionHandling;
184 }
185 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
186 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, state.loop);
187 auto& deviceContext = ref.get<DeviceContext>();
188 // Check if we only have timers
189 auto& spec = ref.get<DeviceSpec const>();
190 if (hasOnlyTimers(spec)) {
192 }
193
194 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
195 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
196 uv_update_time(state.loop);
197 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
198 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
199 }
200 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
201 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
202 uv_update_time(state.loop);
203 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
204 deviceContext.exitTransitionTimeout);
205 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
206 bool onlyGenerated = DataProcessingHelpers::hasOnlyGenerated(spec);
207 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
209 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
210 } else {
211 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
212 "New state requested. Waiting for %d seconds before %{public}s",
213 timeout,
214 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
215 }
217 } else {
218 if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
219 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
220 } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
221 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
222 } else if (policies.termination == TerminationPolicy::QUIT) {
223 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
224 } else {
225 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
226 }
228 }
229}
230
231void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& messages, std::vector<fair::mq::Parts>& forwardedParts,
232 const bool copyByDefault, bool consume)
233{
234 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
235 std::vector<ChannelIndex> forwardingChoices{};
236 size_t pi = 0;
237 while (pi < messages.size()) {
238 auto& header = messages[pi];
239
240 // If is now possible that the record is not complete when
241 // we forward it, because of a custom completion policy.
242 // this means that we need to skip the empty entries in the
243 // record for being forwarded.
244 if (header->GetData() == nullptr) {
245 pi += 2;
246 continue;
247 }
248 auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
249 if (dih) {
250 pi += 2;
251 continue;
252 }
253 auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
254 if (sih) {
255 pi += 2;
256 continue;
257 }
258
259 auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
260 auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
261
262 if (dph == nullptr || dh == nullptr) {
263 // Complain only if this is not an out-of-band message
264 LOGP(error, "Data is missing {}{}{}",
265 dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : "");
266 pi += 2;
267 continue;
268 }
269
270 // At least one payload.
271 auto& payload = messages[pi + 1];
272 // Calculate the number of messages which should be handled together
273 // all in one go.
274 size_t numberOfMessages = 0;
275 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
276 // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together.
277 numberOfMessages = dh->splitPayloadParts + 1; // one is for the header
278 } else {
279 // Sequence of splitPayloadParts (header, payload) pairs belonging together.
280 // In case splitPayloadParts = 0, we consider this as a single message pair
281 numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
282 }
283
284 if (payload.get() == nullptr && consume == true) {
285 // If the payload is not there, it means we already
286 // processed it with ConsumeExisiting. Therefore we
287 // need to do something only if this is the last consume.
288 header.reset(nullptr);
289 pi += numberOfMessages;
290 continue;
291 }
292
293 // We need to find the forward route only for the first
294 // part of a split payload. All the others will use the same.
295 // Therefore, we reset and recompute the forwarding choice:
296 //
297 // - If this is the first payload of a [header0][payload0][header0][payload1]... sequence,
298 // which is actually always created and handled together. Notice that in this
299 // case we have splitPayloadParts == splitPayloadIndex
300 // - If this is the first payload of a [header0][payload0][header1][payload1]... sequence
301 // belonging to the same multipart message (and therefore we are guaranteed that they
302 // need to be routed together).
303 // - If the message is not a multipart (splitPayloadParts 0) or has only one part
304 // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore
305 // we will already use the same choice in the for loop below.
306 //
307
308 forwardingChoices.clear();
309 proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime);
310
311 if (forwardingChoices.empty()) {
312 // Nothing to forward go to the next messageset
313 pi += numberOfMessages;
314 continue;
315 }
316
317 // In case of more than one forward route, we need to copy the message.
318 // This will eventually use the same memory if running with the same backend.
319 if (copyByDefault || forwardingChoices.size() > 1) {
320 for (auto& choice : forwardingChoices) {
321 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
322 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
323
324 for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
325 auto&& newMsg = header->GetTransport()->CreateMessage();
326 newMsg->Copy(*messages[ppi]);
327 forwardedParts[choice.value].AddPart(std::move(newMsg));
328 }
329 }
330 } else {
331 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
332 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
333 for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
334 forwardedParts[forwardingChoices.back().value].AddPart(std::move(messages[ppi]));
335 }
336 }
337 pi += numberOfMessages;
338 }
339}
340
342 std::vector<MessageSet>& currentSetOfInputs,
343 const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
344{
345 // we collect all messages per forward in a map and send them together
346 std::vector<fair::mq::Parts> forwardedParts;
347 forwardedParts.resize(proxy.getNumForwards());
348 std::vector<ChannelIndex> forwardingChoices{};
349
350 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
351 auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
352 routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
353 }
354 return forwardedParts;
355};
356
357} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
void getMatchingForwardChannelIndexes(std::vector< ChannelIndex > &result, header::DataHeader const &header, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
virtual fair::mq::Device * device()=0
GLuint index
Definition glcorearb.h:781
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
Defining PrimaryVertex explicitly as messageable.
void on_transition_requested_expired(uv_timer_t *handle)
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Idle
End of streaming notified.
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
@ Completed
The channel was signaled it will not receive any data.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< MessageSet > &currentSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > &currentSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
bool suppressDomainInfo
do not advertise/forward DomainInfoHeader from this device
static bool onlineDeploymentMode()
@true if running online
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
a BaseHeader with domain information from the source
Forward channel information.
Definition ChannelInfo.h:88
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
Output channel information.
Definition ChannelInfo.h:73
fair::mq::Channel & channel
Definition ChannelInfo.h:76
SendingPolicy const * policy
Definition ChannelInfo.h:77
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback
a BaseHeader with state information from the source
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33