24void customize(std::vector<ConfigParamSpec>& workflowOptions)
26 workflowOptions.push_back(
28 "proxy-name", VariantType::String,
"readout-proxy", {
"name of the proxy processor"}});
30 workflowOptions.push_back(
32 "dataspec", VariantType::String,
"tst:TST/A", {
"selection string for the data to be proxied"}});
34 workflowOptions.push_back(
36 "inject-missing-data", VariantType::Bool,
false, {
"inject missing data according to dataspec if not found in the input"}});
38 workflowOptions.push_back(
40 "sporadic-outputs", VariantType::Bool,
false, {
"consider all the outputs as sporadic"}});
42 workflowOptions.push_back(
44 "print-input-sizes", VariantType::Int, 0, {
"print statistics about sizes per input spec every n TFs"}});
46 workflowOptions.push_back(
48 "throwOnUnmatched", VariantType::Bool,
false, {
"throw if unmatched input data is found"}});
50 workflowOptions.push_back(
52 "timeframes-shm-limit", VariantType::String,
"0", {
"Minimum amount of SHM required in order to publish data"}});
59 auto processorName = config.
options().
get<std::string>(
"proxy-name");
60 auto outputconfig = config.
options().
get<std::string>(
"dataspec");
62 bool sporadicOutputs = config.
options().
get<
bool>(
"sporadic-outputs");
63 auto printSizes = config.
options().
get<
unsigned int>(
"print-input-sizes");
64 bool throwOnUnmatched = config.
options().
get<
bool>(
"throwOnUnmatched");
65 uint64_t minSHM = std::stoul(config.
options().
get<std::string>(
"timeframes-shm-limit"));
66 std::vector<InputSpec> matchers =
select(outputconfig.c_str());
68 for (
auto const& matcher : matchers) {
70 readoutProxyOutput.back().lifetime = sporadicOutputs ? Lifetime::Sporadic : Lifetime::Timeframe;
74 auto filterSpecs = readoutProxyOutput;
76 processorName.c_str(),
77 std::move(readoutProxyOutput),
78 "type=pair,method=connect,address=ipc:///tmp/readout-pipe-0,rateLogging=1,transport=shmem",
83 workflow.emplace_back(readoutProxy);
ConfigParamRegistry & options() const
T get(const char *key) const
Defining PrimaryVertex explicitly as messageable.
void injectMissingData(fair::mq::Device &device, fair::mq::Parts &parts, std::vector< OutputRoute > const &routes, bool doInjectMissingData, unsigned int doPrintSizes)
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static OutputSpec asOutputSpec(InputSpec const &spec)