Project
Loading...
Searching...
No Matches
DataProcessingHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
18#include "Headers/DataHeader.h"
20#include "Headers/Stack.h"
21#include "Framework/Logger.h"
27#include "Framework/Signpost.h"
35#include "Headers/DataHeader.h"
37
38#include <fairmq/Device.h>
39#include <fairmq/Channel.h>
40
41#include <uv.h>
42
43// A log to use for general device logging
45// Stream which keeps track of the calibration lifetime logic
48
49namespace o2::framework
50{
52{
53 fair::mq::Device* device = ref.get<RawDeviceService>().device();
54 fair::mq::Parts parts;
55 fair::mq::MessagePtr payload(device->NewMessage());
58 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.name, 0).Transport());
59 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
60 // sigh... See if we can avoid having it const by not
61 // exposing it to the user in the first place.
62 parts.AddPart(std::move(header));
63 parts.AddPart(std::move(payload));
64 device->Send(parts, channel.name, 0);
65 LOGP(info, "Sending end-of-stream message to channel {}", channel.name);
66}
67
68void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory* transport, ChannelIndex index, SendingPolicy::SendingCallback const& callback, size_t timeslice)
69{
70 fair::mq::Parts parts;
71 fair::mq::MessagePtr payload(transport->CreateMessage());
73 dih.oldestPossibleTimeslice = timeslice;
74 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
75 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
76 // sigh... See if we can avoid having it const by not
77 // exposing it to the user in the first place.
78 parts.AddPart(std::move(header));
79 parts.AddPart(std::move(payload));
80
81 callback(parts, index, ref);
82}
83
85{
86 if (state.oldestForChannel.value >= timeslice) {
87 return false;
88 }
89 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->forward, timeslice);
90 state.oldestForChannel = {timeslice};
91 return true;
92}
93
95{
96 if (state.oldestForChannel.value >= timeslice) {
97 return false;
98 }
99 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->send, timeslice);
100 state.oldestForChannel = {timeslice};
101 return true;
102}
103
105{
106 auto& proxy = ref.get<FairMQDeviceProxy>();
107 for (int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
108 auto& info = proxy.getOutputChannelInfo({ci});
109 auto& state = proxy.getOutputChannelState({ci});
110 sendOldestPossibleTimeframe(ref, info, state, timeslice);
111 }
112}
113
115{
116 auto& state = ref.get<DeviceState>();
117 auto& context = ref.get<DataProcessorContext>();
118 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
119 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
120 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
121 state.streaming = newState;
122 ref.get<ControlService>().notifyStreamingState(state.streaming);
123};
124
125bool hasOnlyTimers(DeviceSpec const& spec)
126{
127 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
128}
129
131{
132 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
133}
134
136{
137 auto* ref = (ServiceRegistryRef*)handle->data;
138 auto& state = ref->get<DeviceState>();
139 auto& spec = ref->get<DeviceSpec const>();
140 state.loopReason |= DeviceState::TIMER_EXPIRED;
141
142 // Check if this is a source device
143 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
144
146 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
148 } else {
149 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
150 state.allowedProcessing = DeviceState::CalibrationOnly;
151 }
152}
153
155{
156 auto* ref = (ServiceRegistryRef*)handle->data;
157 auto& state = ref->get<DeviceState>();
158 state.loopReason |= DeviceState::TIMER_EXPIRED;
159 // Check if this is a source device
160 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
161 auto& spec = ref->get<DeviceSpec const>();
162 std::string messageOnExpire = DataProcessingHelpers::hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
163 if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
164 O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
165 } else {
166 O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
167 }
168 state.transitionHandling = TransitionHandlingState::Expired;
169}
170
172{
173 auto& state = ref.get<DeviceState>();
174 auto& deviceProxy = ref.get<FairMQDeviceProxy>();
175 if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
176 return state.transitionHandling;
177 }
178 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
179 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, state.loop);
180 auto& deviceContext = ref.get<DeviceContext>();
181 // Check if we only have timers
182 auto& spec = ref.get<DeviceSpec const>();
183 if (hasOnlyTimers(spec)) {
185 }
186
187 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
188 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
189 uv_update_time(state.loop);
190 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
191 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
192 }
193 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
194 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
195 uv_update_time(state.loop);
196 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
197 deviceContext.exitTransitionTimeout);
198 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
199 bool onlyGenerated = DataProcessingHelpers::hasOnlyGenerated(spec);
200 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
202 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
203 } else {
204 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
205 "New state requested. Waiting for %d seconds before %{public}s",
206 timeout,
207 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
208 }
210 } else {
211 if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
212 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
213 } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
214 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
215 } else if (policies.termination == TerminationPolicy::QUIT) {
216 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
217 } else {
218 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
219 }
221 }
222}
223
224static auto toBeForwardedHeader = [](void* header) -> bool {
225 // If is now possible that the record is not complete when
226 // we forward it, because of a custom completion policy.
227 // this means that we need to skip the empty entries in the
228 // record for being forwarded.
229 if (header == nullptr) {
230 return false;
231 }
232 auto dh = o2::header::get<header::DataHeader*>(header);
233 if (!dh) {
234 return false;
235 }
236 bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
237 !o2::header::get<DomainInfoHeader*>(header) &&
238 o2::header::get<DataProcessingHeader*>(header);
239 // DataHeader is there. Complain if we have unexpected headers present / missing
240 if (!retval) {
241 LOGP(error, "Dropping data because of malformed header structure");
242 }
243 return retval;
244};
245
246static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
247 FairMQDeviceProxy& proxy,
248 std::unique_ptr<fair::mq::Message>& header,
249 std::unique_ptr<fair::mq::Message>& payload,
250 size_t total,
251 bool consume) {
252 if (header.get() == nullptr) {
253 // Missing an header is not an error anymore.
254 // it simply means that we did not receive the
255 // given input, but we were asked to
256 // consume existing, so we skip it.
257 return false;
258 }
259 if (payload.get() == nullptr && consume == true) {
260 // If the payload is not there, it means we already
261 // processed it with ConsumeExisiting. Therefore we
262 // need to do something only if this is the last consume.
263 header.reset(nullptr);
264 return false;
265 }
266
267 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
268 if (fdph == nullptr) {
269 LOG(error) << "Data is missing DataProcessingHeader";
270 return false;
271 }
272 auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
273 if (fdh == nullptr) {
274 LOG(error) << "Data is missing DataHeader";
275 return false;
276 }
277
278 // We need to find the forward route only for the first
279 // part of a split payload. All the others will use the same.
280 // but always check if we have a sequence of multiple payloads
281 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
282 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
283 }
284 return cachedForwardingChoices.empty() == false;
285};
286
287std::vector<fair::mq::Parts> DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
288 TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
289{
290 // we collect all messages per forward in a map and send them together
291 std::vector<fair::mq::Parts> forwardedParts;
292 forwardedParts.resize(proxy.getNumForwards());
293 std::vector<ChannelIndex> cachedForwardingChoices{};
294 O2_SIGNPOST_ID_GENERATE(sid, forwarding);
295 O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
296 slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
297
298 for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
299 auto& messageSet = currentSetOfInputs[ii];
300 // In case the messageSet is empty, there is nothing to be done.
301 if (messageSet.size() == 0) {
302 continue;
303 }
304 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
305 continue;
306 }
307 cachedForwardingChoices.clear();
308
309 for (size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
310 auto& messageSet = currentSetOfInputs[ii];
311 auto& header = messageSet.header(pi);
312 auto& payload = messageSet.payload(pi);
313 auto total = messageSet.getNumberOfPayloads(pi);
314
315 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
316 continue;
317 }
318
319 // In case of more than one forward route, we need to copy the message.
320 // This will eventually use the same mamory if running with the same backend.
321 if (cachedForwardingChoices.size() > 1) {
322 copy = true;
323 }
324 auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
325 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
326
327 if (copy) {
328 for (auto& cachedForwardingChoice : cachedForwardingChoices) {
329 auto&& newHeader = header->GetTransport()->CreateMessage();
330 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.",
331 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
332 newHeader->Copy(*header);
333 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
334
335 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
336 auto&& newPayload = header->GetTransport()->CreateMessage();
337 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
338 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
339 }
340 }
341 } else {
342 O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.",
343 fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
344 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
345 for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
346 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
347 }
348 }
349 }
350 }
351 return forwardedParts;
352};
353
354} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
virtual fair::mq::Device * device()=0
GLuint index
Definition glcorearb.h:781
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
Defining PrimaryVertex explicitly as messageable.
void on_transition_requested_expired(uv_timer_t *handle)
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Idle
End of streaming notified.
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
@ Completed
The channel was signaled it will not receive any data.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static std::vector< fair::mq::Parts > routeForwardedMessages(FairMQDeviceProxy &proxy, TimesliceSlot slot, std::vector< MessageSet > &currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
Helper to route messages for forwarding.
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static bool onlineDeploymentMode()
@true if running online
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
a BaseHeader with domain information from the source
Forward channel information.
Definition ChannelInfo.h:88
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
Output channel information.
Definition ChannelInfo.h:73
fair::mq::Channel & channel
Definition ChannelInfo.h:76
SendingPolicy const * policy
Definition ChannelInfo.h:77
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback
a BaseHeader with state information from the source
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"