39#include <fairmq/Device.h>
40#include <fairmq/Channel.h>
55 fair::mq::Parts parts;
56 fair::mq::MessagePtr payload(device->NewMessage());
59 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.
name, 0).Transport());
63 parts.AddPart(std::move(header));
64 parts.AddPart(std::move(payload));
65 device->Send(parts, channel.
name, 0);
66 LOGP(info,
"Sending end-of-stream message to channel {}", channel.
name);
71 fair::mq::Parts parts;
72 fair::mq::MessagePtr payload(transport->CreateMessage());
75 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
79 parts.AddPart(std::move(header));
80 parts.AddPart(std::move(payload));
90 if (
state.oldestForChannel.value >= timeslice) {
94 state.oldestForChannel = {timeslice};
103 if (
state.oldestForChannel.value >= timeslice) {
107 state.oldestForChannel = {timeslice};
114 for (
int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
116 auto&
state = proxy.getOutputChannelState({ci});
127 O2_SIGNPOST_START(device, dpid,
"state",
"Starting processing state %d", (
int)newState);
128 state.streaming = newState;
134 return std::all_of(spec.
inputs.cbegin(), spec.
inputs.cend(), [](
InputRoute const& route) ->
bool { return route.matcher.lifetime == Lifetime::Timer; });
139 return (spec.
inputChannels.size() == 1) && (spec.
inputs[0].matcher.lifetime == Lifetime::Timer || spec.
inputs[0].matcher.lifetime == Lifetime::Enumeration);
153 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Switching to EndOfStreaming.");
156 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Only calibrations from this point onwards.");
183 return state.transitionHandling;
195 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
196 uv_update_time(
state.loop);
197 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"timer_setup",
"Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
198 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer,
on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
202 uv_update_time(
state.loop);
204 deviceContext.exitTransitionTimeout);
207 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
212 "New state requested. Waiting for %d seconds before %{public}s",
214 onlyGenerated ?
"dropping remaining input and switching to READY state." :
"switching to READY state.");
219 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state requested. No timeout set, quitting immediately as per --completion-policy");
223 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, quitting immediately as per --completion-policy");
225 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, switching to READY immediately.");
232 const bool copyByDefault,
bool consume)
235 std::vector<ChannelIndex> forwardingChoices{};
237 while (pi < messages.size()) {
238 auto& header = messages[pi];
244 if (header->GetData() ==
nullptr) {
248 auto dih = o2::header::get<DomainInfoHeader*>(header->GetData());
253 auto sih = o2::header::get<SourceInfoHeader*>(header->GetData());
259 auto dph = o2::header::get<DataProcessingHeader*>(header->GetData());
260 auto dh = o2::header::get<o2::header::DataHeader*>(header->GetData());
262 if (dph ==
nullptr || dh ==
nullptr) {
264 LOGP(error,
"Data is missing {}{}{}",
265 dph ?
"DataProcessingHeader" :
"", dph || dh ?
"and" :
"", dh ?
"DataHeader" :
"");
271 auto& payload = messages[pi + 1];
274 size_t numberOfMessages = 0;
275 if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
277 numberOfMessages = dh->splitPayloadParts + 1;
281 numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
284 if (payload.get() ==
nullptr && consume ==
true) {
288 header.reset(
nullptr);
289 pi += numberOfMessages;
308 forwardingChoices.clear();
311 if (forwardingChoices.empty()) {
313 pi += numberOfMessages;
319 if (copyByDefault || forwardingChoices.size() > 1) {
320 for (
auto& choice : forwardingChoices) {
322 fmt::format(
"{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value);
324 for (
size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
325 auto&& newMsg = header->GetTransport()->CreateMessage();
326 newMsg->Copy(*messages[ppi]);
327 forwardedParts[choice.value].AddPart(std::move(newMsg));
332 fmt::format(
"{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value);
333 for (
size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) {
334 forwardedParts[forwardingChoices.back().value].AddPart(std::move(messages[ppi]));
337 pi += numberOfMessages;
342 std::vector<MessageSet>& currentSetOfInputs,
343 const bool copyByDefault,
bool consume) -> std::vector<fair::mq::Parts>
346 std::vector<fair::mq::Parts> forwardedParts;
347 forwardedParts.resize(proxy.getNumForwards());
348 std::vector<ChannelIndex> forwardingChoices{};
350 for (
size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
351 auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii].messages);
352 routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
354 return forwardedParts;
struct uv_timer_s uv_timer_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
void getMatchingForwardChannelIndexes(std::vector< ChannelIndex > &result, header::DataHeader const &header, size_t timeslice) const
Retrieve the channel index from a given OutputSpec and the associated timeslice.
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
virtual fair::mq::Device * device()=0
GLbitfield GLuint64 timeout
Defining PrimaryVertex explicitly as messageable.
void on_transition_requested_expired(uv_timer_t *handle)
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Idle
End of streaming notified.
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
@ Completed
The channel was signaled it will not receive any data.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< MessageSet > ¤tSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > ¤tSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
bool suppressDomainInfo
do not advertise/forward DomainInfoHeader from this device
static bool onlineDeploymentMode()
@true if running online
std::vector< InputRoute > inputs
std::vector< InputChannelSpec > inputChannels
Running state information of a given device.
a BaseHeader with domain information from the source
size_t oldestPossibleTimeslice
Forward channel information.
fair::mq::Channel & channel
ForwardingPolicy const * policy
ForwardingCallback forward
Output channel information.
fair::mq::Channel & channel
SendingPolicy const * policy
enum TerminationPolicy termination
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback