Project
Loading...
Searching...
No Matches
CommonMessageBackends.cxx
Go to the documentation of this file.
1
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3
// All rights not expressly granted are reserved.
4
//
5
// This software is distributed under the terms of the GNU General Public
6
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7
//
8
// In applying this license CERN does not waive the privileges and immunities
9
// granted to it by virtue of its status as an Intergovernmental Organization
10
// or submit itself to any jurisdiction.
11
#include "
Framework/CommonMessageBackends.h
"
12
#include "
Framework/MessageContext.h
"
13
#include "
Framework/ArrowContext.h
"
14
#include "
Framework/StringContext.h
"
15
#include "
Framework/DataProcessor.h
"
16
#include "
Framework/ServiceRegistry.h
"
17
#include "
Framework/RawDeviceService.h
"
18
#include "
Framework/DeviceSpec.h
"
19
#include "
Framework/EndOfStreamContext.h
"
20
#include "
Framework/Tracing.h
"
21
#include "
Framework/DeviceMetricsInfo.h
"
22
#include "
Framework/DeviceInfo.h
"
23
24
#include "
CommonMessageBackendsHelpers.h
"
25
26
#include <Monitoring/Monitoring.h>
27
#include <
Headers/DataHeader.h
>
28
29
#include <fairmq/ProgOptions.h>
30
#include <fairmq/Device.h>
31
32
#include <uv.h>
33
#include <boost/program_options/variables_map.hpp>
34
#include <csignal>
35
36
namespace
o2::framework
37
{
38
39
class
EndOfStreamContext;
40
class
ProcessingContext;
41
42
o2::framework::ServiceSpec
CommonMessageBackends::fairMQDeviceProxy
()
43
{
44
return
ServiceSpec
{
45
.
name
=
"fairmq-device-proxy"
,
46
.init = [](
ServiceRegistryRef
,
DeviceState
&, fair::mq::ProgOptions& options) ->
ServiceHandle
{
47
auto
* proxy =
new
FairMQDeviceProxy
();
48
return
ServiceHandle
{.
hash
= TypeIdHelpers::uniqueId<FairMQDeviceProxy>(), .instance = proxy, .kind =
ServiceKind::Serial
};
49
},
50
.start = [](
ServiceRegistryRef
services,
void
* instance) {
51
auto
* proxy =
static_cast<
FairMQDeviceProxy
*
>
(instance);
52
auto
& outputs = services.
get
<
DeviceSpec
const
>().outputs;
53
auto
& inputs = services.
get
<
DeviceSpec
const
>().inputs;
54
auto
& forwards = services.
get
<
DeviceSpec
const
>().forwards;
55
auto
* device = services.
get
<
RawDeviceService
>().device();
60
proxy->bind(outputs, inputs, forwards, *device); },
61
};
62
}
63
64
o2::framework::ServiceSpec
CommonMessageBackends::fairMQBackendSpec
()
65
{
66
return
ServiceSpec
{
67
.
name
=
"fairmq-backend"
,
68
.uniqueId = CommonServices::simpleServiceId<MessageContext>(),
69
.init = [](
ServiceRegistryRef
services,
DeviceState
&, fair::mq::ProgOptions&) ->
ServiceHandle
{
70
auto
& proxy = services.
get
<
FairMQDeviceProxy
>();
71
auto
context =
new
MessageContext
(proxy);
72
auto
& spec = services.
get
<
DeviceSpec
const
>();
73
auto
& dataSender = services.
get
<
DataSender
>();
74
75
auto
dispatcher = [&dataSender](fair::mq::Parts&& parts,
ChannelIndex
channelIndex,
unsigned
int
) {
76
dataSender.send(parts, channelIndex);
77
};
78
79
auto
matcher = [policy = spec.dispatchPolicy](
o2::header::DataHeader
const
& header) {
80
if
(policy.triggerMatcher ==
nullptr
) {
81
return
true
;
82
}
83
return
policy.triggerMatcher(
Output
{header});
84
};
85
86
if
(spec.dispatchPolicy.action ==
DispatchPolicy::DispatchOp::WhenReady
) {
87
context->init(
DispatchControl
{dispatcher, matcher});
88
}
89
return
ServiceHandle
{.
hash
= TypeIdHelpers::uniqueId<MessageContext>(), .instance = context, .kind =
ServiceKind::Stream
};
90
},
91
.configure =
CommonServices::noConfiguration
(),
92
.preProcessing =
CommonMessageBackendsHelpers<MessageContext>::clearContext
(),
93
.postProcessing =
CommonMessageBackendsHelpers<MessageContext>::sendCallback
(),
94
.preEOS =
CommonMessageBackendsHelpers<MessageContext>::clearContextEOS
(),
95
.postEOS =
CommonMessageBackendsHelpers<MessageContext>::sendCallbackEOS
(),
96
.kind =
ServiceKind::Stream
};
97
}
98
99
o2::framework::ServiceSpec
CommonMessageBackends::stringBackendSpec
()
100
{
101
return
ServiceSpec
{
102
.
name
=
"string-backend"
,
103
.uniqueId = CommonServices::simpleServiceId<StringContext>(),
104
.init =
CommonMessageBackendsHelpers<StringContext>::createCallback
(),
105
.configure =
CommonServices::noConfiguration
(),
106
.preProcessing =
CommonMessageBackendsHelpers<StringContext>::clearContext
(),
107
.postProcessing =
CommonMessageBackendsHelpers<StringContext>::sendCallback
(),
108
.preEOS =
CommonMessageBackendsHelpers<StringContext>::clearContextEOS
(),
109
.postEOS =
CommonMessageBackendsHelpers<StringContext>::sendCallbackEOS
(),
110
.kind =
ServiceKind::Stream
};
111
}
112
113
}
// namespace o2::framework
ArrowContext.h
CommonMessageBackendsHelpers.h
CommonMessageBackends.h
DataHeader.h
DataProcessor.h
DeviceInfo.h
DeviceMetricsInfo.h
DeviceSpec.h
EndOfStreamContext.h
MessageContext.h
RawDeviceService.h
ServiceRegistry.h
StringContext.h
Tracing.h
int
o2::framework::DataSender
Allow injecting policies on send.
Definition
DataSender.h:34
o2::framework::FairMQDeviceProxy
Definition
FairMQDeviceProxy.h:36
o2::framework::MessageContext
Definition
MessageContext.h:54
o2::framework::RawDeviceService
Definition
RawDeviceService.h:28
o2::framework::ServiceRegistryRef
Definition
ServiceRegistryRef.h:21
o2::framework::ServiceRegistryRef::get
T & get() const
Definition
ServiceRegistryRef.h:85
o2::framework
Defining PrimaryVertex explicitly as messageable.
Definition
TFIDInfo.h:20
o2::framework::ServiceKind::Serial
@ Serial
o2::framework::ServiceKind::Stream
@ Stream
o2::framework::ChannelIndex
Definition
RoutingIndices.h:30
o2::framework::CommonMessageBackendsHelpers::createCallback
static ServiceInit createCallback()
Definition
CommonMessageBackendsHelpers.h:30
o2::framework::CommonMessageBackendsHelpers::sendCallbackEOS
static ServiceEOSCallback sendCallbackEOS()
Definition
CommonMessageBackendsHelpers.h:62
o2::framework::CommonMessageBackendsHelpers::clearContext
static ServiceProcessingCallback clearContext()
Definition
CommonMessageBackendsHelpers.h:46
o2::framework::CommonMessageBackendsHelpers::sendCallback
static ServiceProcessingCallback sendCallback()
Definition
CommonMessageBackendsHelpers.h:38
o2::framework::CommonMessageBackendsHelpers::clearContextEOS
static ServiceEOSCallback clearContextEOS()
Definition
CommonMessageBackendsHelpers.h:54
o2::framework::CommonMessageBackends::fairMQDeviceProxy
static ServiceSpec fairMQDeviceProxy()
Definition
CommonMessageBackends.cxx:42
o2::framework::CommonMessageBackends::fairMQBackendSpec
static ServiceSpec fairMQBackendSpec()
Definition
CommonMessageBackends.cxx:64
o2::framework::CommonMessageBackends::stringBackendSpec
static ServiceSpec stringBackendSpec()
Definition
CommonMessageBackends.cxx:99
o2::framework::CommonServices::noConfiguration
static ServiceConfigureCallback noConfiguration()
Definition
CommonServices.h:54
o2::framework::DeviceSpec
Definition
DeviceSpec.h:48
o2::framework::DeviceState
Running state information of a given device.
Definition
DeviceState.h:34
o2::framework::DispatchControl
Control for the message dispatching within message context. Depending on dispatching policy,...
Definition
DispatchControl.h:35
o2::framework::DispatchPolicy::DispatchOp::WhenReady
@ WhenReady
o2::framework::Output
Definition
Output.h:27
o2::framework::ServiceHandle
Definition
ServiceHandle.h:43
o2::framework::ServiceHandle::hash
unsigned int hash
Unique hash associated to the type of service.
Definition
ServiceHandle.h:45
o2::framework::ServiceSpec
Definition
ServiceSpec.h:138
o2::framework::ServiceSpec::name
std::string name
Name of the service.
Definition
ServiceSpec.h:140
o2::header::DataHeader
the main header struct
Definition
DataHeader.h:618
Framework
Core
src
CommonMessageBackends.cxx
Generated on Tue Feb 25 2025 23:16:39 for Project by
1.9.8