Project
Loading...
Searching...
No Matches
raw-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include "Framework/Logger.h"
19#include <vector>
20
21using namespace o2::framework;
22
23// we need to add workflow options before including Framework/runDataProcessing
24void customize(std::vector<ConfigParamSpec>& workflowOptions)
25{
26 workflowOptions.push_back(
28 "proxy-name", VariantType::String, "readout-proxy", {"name of the proxy processor"}});
29
30 workflowOptions.push_back(
32 "dataspec", VariantType::String, "tst:TST/A", {"selection string for the data to be proxied"}});
33
34 workflowOptions.push_back(
36 "inject-missing-data", VariantType::Bool, false, {"inject missing data according to dataspec if not found in the input"}});
37
38 workflowOptions.push_back(
40 "sporadic-outputs", VariantType::Bool, false, {"consider all the outputs as sporadic"}});
41
42 workflowOptions.push_back(
44 "print-input-sizes", VariantType::Int, 0, {"print statistics about sizes per input spec every n TFs"}});
45
46 workflowOptions.push_back(
48 "throwOnUnmatched", VariantType::Bool, false, {"throw if unmatched input data is found"}});
49
50 workflowOptions.push_back(
52 "timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
53}
54
56
58{
59 auto processorName = config.options().get<std::string>("proxy-name");
60 auto outputconfig = config.options().get<std::string>("dataspec");
61 bool injectMissingData = config.options().get<bool>("inject-missing-data");
62 bool sporadicOutputs = config.options().get<bool>("sporadic-outputs");
63 auto printSizes = config.options().get<unsigned int>("print-input-sizes");
64 bool throwOnUnmatched = config.options().get<bool>("throwOnUnmatched");
65 uint64_t minSHM = std::stoul(config.options().get<std::string>("timeframes-shm-limit"));
66 std::vector<InputSpec> matchers = select(outputconfig.c_str());
67 Outputs readoutProxyOutput;
68 for (auto const& matcher : matchers) {
69 readoutProxyOutput.emplace_back(DataSpecUtils::asOutputSpec(matcher));
70 readoutProxyOutput.back().lifetime = sporadicOutputs ? Lifetime::Sporadic : Lifetime::Timeframe;
71 }
72
73 // we use the same specs as filters in the dpl adaptor
74 auto filterSpecs = readoutProxyOutput;
76 processorName.c_str(),
77 std::move(readoutProxyOutput),
78 "type=pair,method=connect,address=ipc:///tmp/readout-pipe-0,rateLogging=1,transport=shmem",
79 dplModelAdaptor(filterSpecs, throwOnUnmatched), minSHM, false, injectMissingData, printSizes);
80 readoutProxy.labels.emplace_back(DataProcessorLabel{"input-proxy"});
81
82 WorkflowSpec workflow;
83 workflow.emplace_back(readoutProxy);
84 return workflow;
85}
ConfigParamRegistry & options() const
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void injectMissingData(fair::mq::Device &device, fair::mq::Parts &parts, std::vector< OutputRoute > const &routes, bool doInjectMissingData, unsigned int doPrintSizes)
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Definition raw-proxy.cxx:57
void customize(std::vector< ConfigParamSpec > &workflowOptions)
Definition raw-proxy.cxx:24
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static OutputSpec asOutputSpec(InputSpec const &spec)