35#include <fairmq/Device.h>
36#include <fairmq/Channel.h>
50 fair::mq::Parts parts;
51 fair::mq::MessagePtr payload(device->NewMessage());
54 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.
name, 0).Transport());
58 parts.AddPart(std::move(header));
59 parts.AddPart(std::move(payload));
60 device->Send(parts, channel.
name, 0);
61 LOGP(info,
"Sending end-of-stream message to channel {}", channel.
name);
66 fair::mq::Parts parts;
67 fair::mq::MessagePtr payload(transport->CreateMessage());
70 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
74 parts.AddPart(std::move(header));
75 parts.AddPart(std::move(payload));
82 if (
state.oldestForChannel.value >= timeslice) {
86 state.oldestForChannel = {timeslice};
92 if (
state.oldestForChannel.value >= timeslice) {
96 state.oldestForChannel = {timeslice};
103 for (
int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
105 auto&
state = proxy.getOutputChannelState({ci});
116 O2_SIGNPOST_START(device, dpid,
"state",
"Starting processing state %d", (
int)newState);
117 state.streaming = newState;
123 return std::all_of(spec.
inputs.cbegin(), spec.
inputs.cend(), [](
InputRoute const& route) ->
bool { return route.matcher.lifetime == Lifetime::Timer; });
128 return (spec.
inputChannels.size() == 1) && (spec.
inputs[0].matcher.lifetime == Lifetime::Timer || spec.
inputs[0].matcher.lifetime == Lifetime::Enumeration);
142 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Switching to EndOfStreaming.");
145 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Only calibrations from this point onwards.");
172 return state.transitionHandling;
184 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
185 uv_update_time(
state.loop);
186 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"timer_setup",
"Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
187 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer,
on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
191 uv_update_time(
state.loop);
193 deviceContext.exitTransitionTimeout);
196 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
201 "New state requested. Waiting for %d seconds before %{public}s",
203 onlyGenerated ?
"dropping remaining input and switching to READY state." :
"switching to READY state.");
208 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state requested. No timeout set, quitting immediately as per --completion-policy");
212 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, quitting immediately as per --completion-policy");
214 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, switching to READY immediately.");
struct uv_timer_s uv_timer_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
virtual fair::mq::Device * device()=0
GLbitfield GLuint64 timeout
Defining PrimaryVertex explicitly as messageable.
void on_transition_requested_expired(uv_timer_t *handle)
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Idle
End of streaming notified.
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
@ Completed
The channel was signaled it will not receive any data.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static bool onlineDeploymentMode()
@true if running online
std::vector< InputRoute > inputs
std::vector< InputChannelSpec > inputChannels
Running state information of a given device.
a BaseHeader with domain information from the source
size_t oldestPossibleTimeslice
Forward channel information.
fair::mq::Channel & channel
ForwardingPolicy const * policy
ForwardingCallback forward
Output channel information.
fair::mq::Channel & channel
SendingPolicy const * policy
enum TerminationPolicy termination
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback