Project
Loading...
Searching...
No Matches
CommonMessageBackends.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
20#include "Framework/Tracing.h"
23
25
26#include <Monitoring/Monitoring.h>
27#include <Headers/DataHeader.h>
28
29#include <fairmq/ProgOptions.h>
30#include <fairmq/Device.h>
31
32#include <uv.h>
33#include <boost/program_options/variables_map.hpp>
34#include <csignal>
35
36namespace o2::framework
37{
38
39class EndOfStreamContext;
40class ProcessingContext;
41
43{
44 return ServiceSpec{
45 .name = "fairmq-device-proxy",
46 .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
47 auto* proxy = new FairMQDeviceProxy();
48 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<FairMQDeviceProxy>(), .instance = proxy, .kind = ServiceKind::Serial};
49 },
50 .start = [](ServiceRegistryRef services, void* instance) {
51 auto* proxy = static_cast<FairMQDeviceProxy*>(instance);
52 auto& outputs = services.get<DeviceSpec const>().outputs;
53 auto& inputs = services.get<DeviceSpec const>().inputs;
54 auto& forwards = services.get<DeviceSpec const>().forwards;
55 auto* device = services.get<RawDeviceService>().device();
60 proxy->bind(outputs, inputs, forwards, *device); },
61 };
62}
63
65{
66 return ServiceSpec{
67 .name = "fairmq-backend",
68 .uniqueId = CommonServices::simpleServiceId<MessageContext>(),
69 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
70 auto& proxy = services.get<FairMQDeviceProxy>();
71 auto context = new MessageContext(proxy);
72 auto& spec = services.get<DeviceSpec const>();
73 auto& dataSender = services.get<DataSender>();
74
75 auto dispatcher = [&dataSender](fair::mq::Parts&& parts, ChannelIndex channelIndex, unsigned int) {
76 dataSender.send(parts, channelIndex);
77 };
78
79 auto matcher = [policy = spec.dispatchPolicy](o2::header::DataHeader const& header) {
80 if (policy.triggerMatcher == nullptr) {
81 return true;
82 }
83 return policy.triggerMatcher(Output{header});
84 };
85
86 if (spec.dispatchPolicy.action == DispatchPolicy::DispatchOp::WhenReady) {
87 context->init(DispatchControl{dispatcher, matcher});
88 }
89 return ServiceHandle{.hash = TypeIdHelpers::uniqueId<MessageContext>(), .instance = context, .kind = ServiceKind::Stream};
90 },
96 .kind = ServiceKind::Stream};
97}
98
112
113} // namespace o2::framework
Allow injecting policies on send.
Definition DataSender.h:34
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static ServiceConfigureCallback noConfiguration()
Running state information of a given device.
Definition DeviceState.h:34
Control for the message dispatching within message context. Depending on dispatching policy,...
unsigned int hash
Unique hash associated to the type of service.
std::string name
Name of the service.
the main header struct
Definition DataHeader.h:618