Project
Loading...
Searching...
No Matches
DataProcessingHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
18#include "Headers/DataHeader.h"
19#include "Headers/Stack.h"
20#include "Framework/Logger.h"
26#include "Framework/Signpost.h"
34
35#include <fairmq/Device.h>
36#include <fairmq/Channel.h>
37
38#include <uv.h>
39
40// A log to use for general device logging
42// Stream which keeps track of the calibration lifetime logic
44
45namespace o2::framework
46{
48{
49 fair::mq::Device* device = ref.get<RawDeviceService>().device();
50 fair::mq::Parts parts;
51 fair::mq::MessagePtr payload(device->NewMessage());
54 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.name, 0).Transport());
55 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
56 // sigh... See if we can avoid having it const by not
57 // exposing it to the user in the first place.
58 parts.AddPart(std::move(header));
59 parts.AddPart(std::move(payload));
60 device->Send(parts, channel.name, 0);
61 LOGP(info, "Sending end-of-stream message to channel {}", channel.name);
62}
63
64void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory* transport, ChannelIndex index, SendingPolicy::SendingCallback const& callback, size_t timeslice)
65{
66 fair::mq::Parts parts;
67 fair::mq::MessagePtr payload(transport->CreateMessage());
69 dih.oldestPossibleTimeslice = timeslice;
70 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
71 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
72 // sigh... See if we can avoid having it const by not
73 // exposing it to the user in the first place.
74 parts.AddPart(std::move(header));
75 parts.AddPart(std::move(payload));
76
77 callback(parts, index, ref);
78}
79
81{
82 if (state.oldestForChannel.value >= timeslice) {
83 return false;
84 }
85 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->forward, timeslice);
86 state.oldestForChannel = {timeslice};
87 return true;
88}
89
91{
92 if (state.oldestForChannel.value >= timeslice) {
93 return false;
94 }
95 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->send, timeslice);
96 state.oldestForChannel = {timeslice};
97 return true;
98}
99
101{
102 auto& proxy = ref.get<FairMQDeviceProxy>();
103 for (int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
104 auto& info = proxy.getOutputChannelInfo({ci});
105 auto& state = proxy.getOutputChannelState({ci});
106 sendOldestPossibleTimeframe(ref, info, state, timeslice);
107 }
108}
109
111{
112 auto& state = ref.get<DeviceState>();
113 auto& context = ref.get<DataProcessorContext>();
114 O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
115 O2_SIGNPOST_END(device, dpid, "state", "End of processing state %d", (int)state.streaming);
116 O2_SIGNPOST_START(device, dpid, "state", "Starting processing state %d", (int)newState);
117 state.streaming = newState;
118 ref.get<ControlService>().notifyStreamingState(state.streaming);
119};
120
121bool hasOnlyTimers(DeviceSpec const& spec)
122{
123 return std::all_of(spec.inputs.cbegin(), spec.inputs.cend(), [](InputRoute const& route) -> bool { return route.matcher.lifetime == Lifetime::Timer; });
124}
125
127{
128 return (spec.inputChannels.size() == 1) && (spec.inputs[0].matcher.lifetime == Lifetime::Timer || spec.inputs[0].matcher.lifetime == Lifetime::Enumeration);
129}
130
132{
133 auto* ref = (ServiceRegistryRef*)handle->data;
134 auto& state = ref->get<DeviceState>();
135 auto& spec = ref->get<DeviceSpec const>();
136 state.loopReason |= DeviceState::TIMER_EXPIRED;
137
138 // Check if this is a source device
139 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
140
142 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Switching to EndOfStreaming.");
144 } else {
145 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
146 state.allowedProcessing = DeviceState::CalibrationOnly;
147 }
148}
149
151{
152 auto* ref = (ServiceRegistryRef*)handle->data;
153 auto& state = ref->get<DeviceState>();
154 state.loopReason |= DeviceState::TIMER_EXPIRED;
155 // Check if this is a source device
156 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, handle);
157 auto& spec = ref->get<DeviceSpec const>();
158 std::string messageOnExpire = DataProcessingHelpers::hasOnlyGenerated(spec) ? "DPL exit transition grace period for source expired. Exiting." : fmt::format("DPL exit transition grace period for {} expired. Exiting.", state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration").c_str();
159 if (!ref->get<RawDeviceService>().device()->GetConfig()->GetValue<bool>("error-on-exit-transition-timeout")) {
160 O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
161 } else {
162 O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "%{public}s", messageOnExpire.c_str());
163 }
164 state.transitionHandling = TransitionHandlingState::Expired;
165}
166
168{
169 auto& state = ref.get<DeviceState>();
170 auto& deviceProxy = ref.get<FairMQDeviceProxy>();
171 if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
172 return state.transitionHandling;
173 }
174 O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
175 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, state.loop);
176 auto& deviceContext = ref.get<DeviceContext>();
177 // Check if we only have timers
178 auto& spec = ref.get<DeviceSpec const>();
179 if (hasOnlyTimers(spec)) {
181 }
182
183 // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
184 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
185 uv_update_time(state.loop);
186 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
187 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
188 }
189 if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
190 ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
191 uv_update_time(state.loop);
192 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
193 deviceContext.exitTransitionTimeout);
194 uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
195 bool onlyGenerated = DataProcessingHelpers::hasOnlyGenerated(spec);
196 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
198 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
199 } else {
200 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
201 "New state requested. Waiting for %d seconds before %{public}s",
202 timeout,
203 onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
204 }
206 } else {
207 if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
208 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
209 } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
210 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
211 } else if (policies.termination == TerminationPolicy::QUIT) {
212 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
213 } else {
214 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
215 }
217 }
218}
219
220} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:553
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:563
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
virtual fair::mq::Device * device()=0
GLuint index
Definition glcorearb.h:781
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
Defining PrimaryVertex explicitly as messageable.
void on_transition_requested_expired(uv_timer_t *handle)
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Idle
End of streaming notified.
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
@ Completed
The channel was signaled it will not receive any data.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static bool onlineDeploymentMode()
@true if running online
std::vector< InputRoute > inputs
Definition DeviceSpec.h:62
std::vector< InputChannelSpec > inputChannels
Definition DeviceSpec.h:54
Running state information of a given device.
Definition DeviceState.h:34
a BaseHeader with domain information from the source
Forward channel information.
Definition ChannelInfo.h:88
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
Output channel information.
Definition ChannelInfo.h:73
fair::mq::Channel & channel
Definition ChannelInfo.h:76
SendingPolicy const * policy
Definition ChannelInfo.h:77
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback
a BaseHeader with state information from the source
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33