24#include <fairmq/Device.h>
25#include <fairmq/Channel.h>
32 fair::mq::Parts parts;
33 fair::mq::MessagePtr payload(device->NewMessage());
36 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.
name, 0).Transport());
40 parts.AddPart(std::move(header));
41 parts.AddPart(std::move(payload));
42 device->Send(parts, channel.
name, 0);
43 LOGP(info,
"Sending end-of-stream message to channel {}", channel.
name);
48 fair::mq::Parts parts;
49 fair::mq::MessagePtr payload(transport->CreateMessage());
52 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
56 parts.AddPart(std::move(header));
57 parts.AddPart(std::move(payload));
64 if (
state.oldestForChannel.value >= timeslice) {
68 state.oldestForChannel = {timeslice};
74 if (
state.oldestForChannel.value >= timeslice) {
78 state.oldestForChannel = {timeslice};
85 for (
int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
87 auto&
state = proxy.getOutputChannelState({ci});
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
Defining PrimaryVertex explicitly as messageable.
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ Completed
The channel was signaled it will not receive any data.
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
a BaseHeader with domain information from the source
size_t oldestPossibleTimeslice
Forward channel information.
fair::mq::Channel & channel
ForwardingPolicy const * policy
ForwardingCallback forward
Output channel information.
fair::mq::Channel & channel
SendingPolicy const * policy
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback