38#include <fairmq/Device.h>
39#include <fairmq/Channel.h>
54 fair::mq::Parts parts;
55 fair::mq::MessagePtr payload(device->NewMessage());
58 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.
name, 0).Transport());
62 parts.AddPart(std::move(header));
63 parts.AddPart(std::move(payload));
64 device->Send(parts, channel.
name, 0);
65 LOGP(info,
"Sending end-of-stream message to channel {}", channel.
name);
70 fair::mq::Parts parts;
71 fair::mq::MessagePtr payload(transport->CreateMessage());
74 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
78 parts.AddPart(std::move(header));
79 parts.AddPart(std::move(payload));
86 if (
state.oldestForChannel.value >= timeslice) {
90 state.oldestForChannel = {timeslice};
96 if (
state.oldestForChannel.value >= timeslice) {
100 state.oldestForChannel = {timeslice};
107 for (
int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
109 auto&
state = proxy.getOutputChannelState({ci});
120 O2_SIGNPOST_START(device, dpid,
"state",
"Starting processing state %d", (
int)newState);
121 state.streaming = newState;
127 return std::all_of(spec.
inputs.cbegin(), spec.
inputs.cend(), [](
InputRoute const& route) ->
bool { return route.matcher.lifetime == Lifetime::Timer; });
132 return (spec.
inputChannels.size() == 1) && (spec.
inputs[0].matcher.lifetime == Lifetime::Timer || spec.
inputs[0].matcher.lifetime == Lifetime::Enumeration);
146 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Switching to EndOfStreaming.");
149 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"callback",
"Grace period for data processing expired. Only calibrations from this point onwards.");
176 return state.transitionHandling;
188 if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
189 uv_update_time(
state.loop);
190 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"timer_setup",
"Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
191 uv_timer_start(deviceContext.dataProcessingGracePeriodTimer,
on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
195 uv_update_time(
state.loop);
197 deviceContext.exitTransitionTimeout);
200 int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
205 "New state requested. Waiting for %d seconds before %{public}s",
207 onlyGenerated ?
"dropping remaining input and switching to READY state." :
"switching to READY state.");
212 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state requested. No timeout set, quitting immediately as per --completion-policy");
216 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, quitting immediately as per --completion-policy");
218 O2_SIGNPOST_EVENT_EMIT_INFO(device, lid,
"run_loop",
"New state pending and we are already idle, switching to READY immediately.");
224static auto toBeForwardedHeader = [](
void* header) ->
bool {
229 if (header ==
nullptr) {
232 auto dh = o2::header::get<header::DataHeader*>(header);
236 bool retval = !o2::header::get<SourceInfoHeader*>(header) &&
237 !o2::header::get<DomainInfoHeader*>(header) &&
238 o2::header::get<DataProcessingHeader*>(header);
241 LOGP(error,
"Dropping data because of malformed header structure");
246static auto toBeforwardedMessageSet = [](std::vector<ChannelIndex>& cachedForwardingChoices,
247 FairMQDeviceProxy& proxy,
248 std::unique_ptr<fair::mq::Message>& header,
249 std::unique_ptr<fair::mq::Message>& payload,
252 if (header.get() ==
nullptr) {
259 if (payload.get() ==
nullptr && consume ==
true) {
263 header.reset(
nullptr);
267 auto fdph = o2::header::get<DataProcessingHeader*>(header->GetData());
268 if (fdph ==
nullptr) {
269 LOG(error) <<
"Data is missing DataProcessingHeader";
272 auto fdh = o2::header::get<header::DataHeader*>(header->GetData());
273 if (fdh ==
nullptr) {
274 LOG(error) <<
"Data is missing DataHeader";
281 if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || total > 1) {
282 proxy.getMatchingForwardChannelIndexes(cachedForwardingChoices, *fdh, fdph->startTime);
284 return cachedForwardingChoices.empty() ==
false;
291 std::vector<fair::mq::Parts> forwardedParts;
293 std::vector<ChannelIndex> cachedForwardingChoices{};
295 O2_SIGNPOST_START(forwarding, sid,
"forwardInputs",
"Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
296 slot.
index, oldestTimeslice.
timeslice.
value, copy ?
"with copy" :
"", copy && consume ?
" and " :
"", consume ?
"with consume" :
"");
298 for (
size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
299 auto& messageSet = currentSetOfInputs[ii];
301 if (messageSet.size() == 0) {
304 if (!toBeForwardedHeader(messageSet.header(0)->GetData())) {
307 cachedForwardingChoices.clear();
309 for (
size_t pi = 0; pi < currentSetOfInputs[ii].size(); ++pi) {
310 auto& messageSet = currentSetOfInputs[ii];
311 auto& header = messageSet.header(pi);
312 auto& payload = messageSet.payload(pi);
313 auto total = messageSet.getNumberOfPayloads(pi);
315 if (!toBeforwardedMessageSet(cachedForwardingChoices, proxy, header, payload, total, consume)) {
321 if (cachedForwardingChoices.size() > 1) {
324 auto* dh = o2::header::get<header::DataHeader*>(header->GetData());
325 auto* dph = o2::header::get<DataProcessingHeader*>(header->GetData());
328 for (
auto& cachedForwardingChoice : cachedForwardingChoices) {
329 auto&& newHeader = header->GetTransport()->CreateMessage();
331 fmt::format(
"{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoice.value);
332 newHeader->Copy(*header);
333 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newHeader));
335 for (
size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
336 auto&& newPayload = header->GetTransport()->CreateMessage();
337 newPayload->Copy(*messageSet.payload(pi, payloadIndex));
338 forwardedParts[cachedForwardingChoice.value].AddPart(std::move(newPayload));
343 fmt::format(
"{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), cachedForwardingChoices.back().value);
344 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.header(pi)));
345 for (
size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) {
346 forwardedParts[cachedForwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex)));
351 return forwardedParts;
struct uv_timer_s uv_timer_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
size_t getNumForwards() const
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
virtual fair::mq::Device * device()=0
GLbitfield GLuint64 timeout
Defining PrimaryVertex explicitly as messageable.
void on_transition_requested_expired(uv_timer_t *handle)
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Idle
End of streaming notified.
@ Expired
A transition needs to be fullfilled ASAP.
@ NoTransition
No pending transitions.
@ Requested
A transition was notified to be requested.
@ Completed
The channel was signaled it will not receive any data.
void on_data_processing_expired(uv_timer_t *handle)
bool hasOnlyTimers(DeviceSpec const &spec)
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static bool hasOnlyGenerated(DeviceSpec const &spec)
check if spec is a source devide
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const &ref, ProcessingPolicies const &policies)
starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static void switchState(ServiceRegistryRef const &ref, StreamingState newState)
change the device StreamingState to newState
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static std::vector< fair::mq::Parts > routeForwardedMessages(FairMQDeviceProxy &proxy, TimesliceSlot slot, std::vector< MessageSet > ¤tSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
Helper to route messages for forwarding.
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
static bool onlineDeploymentMode()
@true if running online
std::vector< InputRoute > inputs
std::vector< InputChannelSpec > inputChannels
Running state information of a given device.
a BaseHeader with domain information from the source
size_t oldestPossibleTimeslice
Forward channel information.
fair::mq::Channel & channel
ForwardingPolicy const * policy
ForwardingCallback forward
Output channel information.
fair::mq::Channel & channel
SendingPolicy const * policy
enum TerminationPolicy termination
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"