Project
Loading...
Searching...
No Matches
ExternalFairMQDeviceProxy.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_RAWDEVICESOURCE_H
12#define FRAMEWORK_RAWDEVICESOURCE_H
13
16#include <fairmq/FwdDecls.h>
17#include <vector>
18#include <functional>
19
20namespace o2::framework
21{
22
25using ChannelRetriever = std::function<std::string const&(OutputSpec const&, DataProcessingHeader::StartTime)>;
35using InjectorFunction = std::function<bool(TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& inputs, ChannelRetriever, size_t newTimesliceId, bool& stop)>;
36using ChannelSelector = std::function<std::string(InputSpec const& input, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>& channels)>;
37
38struct InputChannelSpec;
40
43
46
50void sendOnChannel(fair::mq::Device& device, o2::header::Stack&& headerStack, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, ChannelRetriever& channelRetriever);
51
52void sendOnChannel(fair::mq::Device& device, fair::mq::Parts& messages, std::string const& channel, size_t timeSlice);
53
56void appendForSending(fair::mq::Device& device, o2::header::Stack&& headerStack, size_t timeSliceID, fair::mq::MessagePtr&& payloadMessage, OutputSpec const& spec, fair::mq::Parts& messageCache, ChannelRetriever& channelRetriever);
57
61InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::SerializationMethod method, uint64_t startTime, uint64_t step);
62
69InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step);
70
77 bool paranoid = false;
79 bool blindForward = false;
80};
81
87 DPLModelAdapterConfig config = DPLModelAdapterConfig{});
88
90inline InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs, bool throwOnUnmatchedInputs)
91{
92 return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs});
93}
94
96static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, header::gSerializationMethodROOT, 0, 1);
97
99std::string defaultOutputProxyChannelSelector(InputSpec const& input, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>& channels);
100
113 std::vector<OutputSpec> const& outputs,
114 const char* defaultChannelConfig,
115 InjectorFunction converter,
116 uint64_t minSHM = 0,
117 bool sendTFcounter = false,
118 bool doInjectMissingData = false,
119 unsigned int doPrintSizes = 0);
120
122 Inputs const& inputSpecs,
123 const char* defaultChannelConfig);
143 Inputs const& inputSpecs,
144 const char* defaultChannelConfig,
146
147} // namespace o2::framework
148
149#endif // FRAMEWORK_RAWDEVICESOURCE_H
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
constexpr o2::header::DataOrigin gDataOriginAny
Definition DataHeader.h:560
constexpr o2::header::DataDescription gDataDescriptionAny
Definition DataHeader.h:595
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< std::string(InputSpec const &input, const std::unordered_map< std::string, std::vector< fair::mq::Channel > > &channels)> ChannelSelector
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig, ChannelSelector channelSelector=defaultOutputProxyChannelSelector)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
InjectorFunction o2DataModelAdaptor(OutputSpec const &spec, uint64_t startTime, uint64_t step)
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
std::string defaultOutputProxyChannelSelector(InputSpec const &input, const std::unordered_map< std::string, std::vector< fair::mq::Channel > > &channels)
Default way to select an output channel for multi-output proxy.
std::vector< InputSpec > Inputs
void appendForSending(fair::mq::Device &device, o2::header::Stack &&headerStack, size_t timeSliceID, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, fair::mq::Parts &messageCache, ChannelRetriever &channelRetriever)
InjectorFunction incrementalConverter(OutputSpec const &spec, o2::header::SerializationMethod method, uint64_t startTime, uint64_t step)
constexpr o2::header::SerializationMethod gSerializationMethodROOT
Definition DataHeader.h:328
bool blindForward
blindly forward on one channel
bool paranoid
do all kinds of consistency checks
bool throwOnUnmatchedInputs
throw runtime error if an input message is not matched by filter rules
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
std::vector< ChannelData > channels