Project
Loading...
Searching...
No Matches
DataProcessingHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
18#include "Headers/DataHeader.h"
19#include "Headers/Stack.h"
20#include "Framework/Logger.h"
23
24#include <fairmq/Device.h>
25#include <fairmq/Channel.h>
26
27namespace o2::framework
28{
30{
31 fair::mq::Device* device = ref.get<RawDeviceService>().device();
32 fair::mq::Parts parts;
33 fair::mq::MessagePtr payload(device->NewMessage());
36 auto channelAlloc = o2::pmr::getTransportAllocator(device->GetChannel(channel.name, 0).Transport());
37 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
38 // sigh... See if we can avoid having it const by not
39 // exposing it to the user in the first place.
40 parts.AddPart(std::move(header));
41 parts.AddPart(std::move(payload));
42 device->Send(parts, channel.name, 0);
43 LOGP(info, "Sending end-of-stream message to channel {}", channel.name);
44}
45
46void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory* transport, ChannelIndex index, SendingPolicy::SendingCallback const& callback, size_t timeslice)
47{
48 fair::mq::Parts parts;
49 fair::mq::MessagePtr payload(transport->CreateMessage());
51 dih.oldestPossibleTimeslice = timeslice;
52 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
53 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
54 // sigh... See if we can avoid having it const by not
55 // exposing it to the user in the first place.
56 parts.AddPart(std::move(header));
57 parts.AddPart(std::move(payload));
58
59 callback(parts, index, ref);
60}
61
63{
64 if (state.oldestForChannel.value >= timeslice) {
65 return false;
66 }
67 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->forward, timeslice);
68 state.oldestForChannel = {timeslice};
69 return true;
70}
71
73{
74 if (state.oldestForChannel.value >= timeslice) {
75 return false;
76 }
77 doSendOldestPossibleTimeframe(ref, info.channel.Transport(), info.index, info.policy->send, timeslice);
78 state.oldestForChannel = {timeslice};
79 return true;
80}
81
83{
84 auto& proxy = ref.get<FairMQDeviceProxy>();
85 for (int ci = 0; ci < proxy.getNumOutputChannels(); ++ci) {
86 auto& info = proxy.getOutputChannelInfo({ci});
87 auto& state = proxy.getOutputChannelState({ci});
88 sendOldestPossibleTimeframe(ref, info, state, timeslice);
89 }
90}
91
92} // namespace o2::framework
benchmark::State & state
OutputChannelInfo const & getOutputChannelInfo(ChannelIndex channelIndex) const
Retrieve information associated to a given forward by ChannelIndex.
GLuint index
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFactory *transport, ChannelIndex index, SendingPolicy::SendingCallback const &callback, size_t timeslice)
@ Completed
The channel was signaled it will not receive any data.
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static void sendEndOfStream(ServiceRegistryRef const &ref, OutputChannelSpec const &channel)
static bool sendOldestPossibleTimeframe(ServiceRegistryRef const &ref, ForwardChannelInfo const &info, ForwardChannelState &state, size_t timeslice)
static void broadcastOldestPossibleTimeslice(ServiceRegistryRef const &ref, size_t timeslice)
Broadcast the oldest possible timeslice to all channels in output.
a BaseHeader with domain information from the source
Forward channel information.
Definition ChannelInfo.h:88
ForwardingPolicy const * policy
Definition ChannelInfo.h:94
Output channel information.
Definition ChannelInfo.h:73
fair::mq::Channel & channel
Definition ChannelInfo.h:76
SendingPolicy const * policy
Definition ChannelInfo.h:77
std::function< void(fair::mq::Parts &, ChannelIndex channelIndex, ServiceRegistryRef registry)> SendingCallback
a BaseHeader with state information from the source
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36