Project
Loading...
Searching...
No Matches
CommonMessageBackends.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
20#include "Framework/Tracing.h"
23
25
26#include <Monitoring/Monitoring.h>
27#include <Headers/DataHeader.h>
28
29#include <fairmq/ProgOptions.h>
30#include <fairmq/Device.h>
31
32#include <uv.h>
33#include <boost/program_options/variables_map.hpp>
34#include <csignal>
35
36namespace o2::framework
37{
38
39class EndOfStreamContext;
40class ProcessingContext;
41
43{
44 return ServiceSpec{
45 .name = "fairmq-device-proxy",
46 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
47 auto* proxy = new FairMQDeviceProxy();
48 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<FairMQDeviceProxy>(), .instance = proxy, .kind = ServiceKind::Serial};
49 },
50 .start = [](ServiceRegistryRef services, void* instance) {
51 auto* proxy = static_cast<FairMQDeviceProxy*>(instance);
52 auto& outputs = services.get<DeviceSpec const>().outputs;
53 auto& inputs = services.get<DeviceSpec const>().inputs;
54 auto& forwards = services.get<DeviceSpec const>().forwards;
55 auto* device = services.get<RawDeviceService>().device();
60 std::function<fair::mq::Channel&(std::string const&)> bindByName = [device](std::string const& channelName) -> fair::mq::Channel& {
61 auto channel = device->GetChannels().find(channelName);
62 if (channel == device->GetChannels().end()) {
63 LOGP(fatal, "Expected channel {} not configured.", channelName);
64 }
65 return channel->second.at(0);
66 };
67
68 std::function<bool()> newStateCallback = [device]() -> bool {
69 return device->NewStatePending();
70 };
71 proxy->bind(outputs, inputs, forwards, bindByName, newStateCallback); },
72 };
73}
74
76{
77 return ServiceSpec{
78 .name = "fairmq-backend",
79 .uniqueId = CommonServices::simpleServiceId<MessageContext>(),
80 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
81 auto& proxy = services.get<FairMQDeviceProxy>();
82 auto context = new MessageContext(proxy);
83 auto& spec = services.get<DeviceSpec const>();
84 auto& dataSender = services.get<DataSender>();
85
86 auto dispatcher = [&dataSender](fair::mq::Parts&& parts, ChannelIndex channelIndex, unsigned int) {
87 dataSender.send(parts, channelIndex);
88 };
89
90 auto matcher = [policy = spec.dispatchPolicy](o2::header::DataHeader const& header) {
91 if (policy.triggerMatcher == nullptr) {
92 return true;
93 }
94 return policy.triggerMatcher(Output{header});
95 };
96
97 if (spec.dispatchPolicy.action == DispatchPolicy::DispatchOp::WhenReady) {
98 context->init(DispatchControl{dispatcher, matcher});
99 }
100 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<MessageContext>(), .instance = context, .kind = ServiceKind::Stream};
101 },
102 .configure = CommonServices::noConfiguration(),
107 .kind = ServiceKind::Stream};
108}
109
123
124} // namespace o2::framework
Allow injecting policies on send.
Definition DataSender.h:34
Defining PrimaryVertex explicitly as messageable.
static ServiceConfigureCallback noConfiguration()
Running state information of a given device.
Definition DeviceState.h:34
Control for the message dispatching within message context. Depending on dispatching policy,...
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
the main header struct
Definition DataHeader.h:619