28void customize(std::vector<CompletionPolicy>& policies)
34void customize(std::vector<ConfigParamSpec>& options)
36 options.push_back({
"mergers-layers", VariantType::Int, 1, {
"Number of layers in the merger topology"}});
37 options.push_back({
"mergers-merge-decision", VariantType::String,
"publication", {
"At which occasion objects are merged: 'arrival' or 'publication'"}});
38 options.push_back({
"mergers-publication-decision", VariantType::String,
"interval", {
"When merged objects are published: interval or all-updated"}});
39 options.push_back({
"mergers-publication-interval", VariantType::Double, 10.0, {
"Publication interval of merged object [s]. It takes effect with --mergers-publication-decision interval"}});
41 {
"mergers-ownership-mode", VariantType::String,
"diffs", {
"Should the topology use 'diffs' or 'full' objects"}});
42 options.push_back({
"input-channel-config", VariantType::String,
"", {
"Proxy input FMQ channel configuration"}});
52using namespace std::chrono;
56 int mergersLayers = config.
options().
get<
int>(
"mergers-layers");
58 double mergersPublicationInterval = config.
options().
get<
double>(
"mergers-publication-interval");
60 config.
options().
get<std::string>(
"mergers-ownership-mode") ==
"full" ? InputObjectsTimespan::FullHistory : InputObjectsTimespan::LastDifference;
61 std::string inputChannelConfig = config.
options().
get<std::string>(
"input-channel-config");
67 {{{
"histo"}, {
"TST",
"HISTO"}}},
68 inputChannelConfig.c_str(),
75 mergersBuilder.
setOutputSpec({{
"main"},
"TST",
"FULLHISTO", 0});
78 std::vector<std::pair<size_t, size_t>>
param = {{mergersPublicationInterval, 1}};
81 mergerConfig.
topologySize = {TopologySize::NumberOfLayers, mergersLayers};
86 auto printHisto = [](
const TH1* histo) {
88 std::string
bins =
"BINS:";
89 for (
int i = 1;
i <= histo->GetNbinsX();
i++) {
92 LOG(info) <<
"Trimming the output to 100 entries, total is: " << histo->GetNbinsX();
98 LOG(info) <<
"they asked me to print a nullptr";
106 {
"histo",
"TST",
"FULLHISTO", 0 }
112 auto ref = processingContext.inputs().get<
DataRef>(
"histo");
113 auto tobject = DataRefUtils::as<TObject>(
ref);
114 if (
auto histo =
dynamic_cast<const TH1F*
>(tobject.get())) {
116 }
else if (
auto collection =
dynamic_cast<TCollection*
>(tobject.get())) {
117 LOG(info) <<
"Received a collection, printing the first and the last histogram, total is: " <<
std::to_string(collection->GetEntries());
118 printHisto(
dynamic_cast<TH1*
>(collection->begin()()));
119 printHisto(
dynamic_cast<TH1*
>(collection->FindObject(
std::to_string(collection->GetEntries() - 1).c_str())));
120 collection->SetOwner(
true);
126 specs.push_back(printer);
Definition of O2 MergerInfrastructureBuilder, v0.1.
ConfigParamRegistry & options() const
T get(const char *key) const
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
Builder class for Merger topologies.
void setInfrastructureName(std::string name)
void setOutputSpec(const framework::OutputSpec &outputSpec)
framework::WorkflowSpec generateInfrastructure()
void setConfig(MergerConfig config)
void setInputSpecs(const framework::Inputs &inputs)
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Defining PrimaryVertex explicitly as messageable.
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
A label that can be associated to a DataProcessorSpec.
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"