Project
Loading...
Searching...
No Matches
multinodeBenchmarkMergers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17
19
21
22#include <memory>
23#include <random>
24
25using namespace o2::framework;
26using namespace o2::mergers;
27
28void customize(std::vector<CompletionPolicy>& policies)
29{
31}
32
33// we need to add workflow options before including Framework/runDataProcessing
34void customize(std::vector<ConfigParamSpec>& options)
35{
36 options.push_back({"mergers-layers", VariantType::Int, 1, {"Number of layers in the merger topology"}});
37 options.push_back({"mergers-merge-decision", VariantType::String, "publication", {"At which occasion objects are merged: 'arrival' or 'publication'"}});
38 options.push_back({"mergers-publication-decision", VariantType::String, "interval", {"When merged objects are published: interval or all-updated"}});
39 options.push_back({"mergers-publication-interval", VariantType::Double, 10.0, {"Publication interval of merged object [s]. It takes effect with --mergers-publication-decision interval"}});
40 options.push_back(
41 {"mergers-ownership-mode", VariantType::String, "diffs", {"Should the topology use 'diffs' or 'full' objects"}});
42 options.push_back({"input-channel-config", VariantType::String, "", {"Proxy input FMQ channel configuration"}});
43}
44
47#include "Framework/Logger.h"
48#include <TH1.h>
49
51
52using namespace std::chrono;
53
55{
56 int mergersLayers = config.options().get<int>("mergers-layers");
57 PublicationDecision mergersPublicationDecision = PublicationDecision::EachNSeconds;
58 double mergersPublicationInterval = config.options().get<double>("mergers-publication-interval");
59 InputObjectsTimespan mergersOwnershipMode =
60 config.options().get<std::string>("mergers-ownership-mode") == "full" ? InputObjectsTimespan::FullHistory : InputObjectsTimespan::LastDifference;
61 std::string inputChannelConfig = config.options().get<std::string>("input-channel-config");
62
63 WorkflowSpec specs;
64
65 specs.emplace_back(std::move(specifyExternalFairMQDeviceProxy(
66 "histo",
67 {{{"histo"}, {"TST", "HISTO"}}},
68 inputChannelConfig.c_str(),
69 dplModelAdaptor())));
70 specs.back().labels.emplace_back(DataProcessorLabel{"input-proxy"});
71
72 MergerInfrastructureBuilder mergersBuilder;
73 mergersBuilder.setInfrastructureName("histos");
74 mergersBuilder.setInputSpecs({{"histo", {"TST", "HISTO"}}});
75 mergersBuilder.setOutputSpec({{"main"}, "TST", "FULLHISTO", 0});
76 MergerConfig mergerConfig;
77 mergerConfig.inputObjectTimespan = {mergersOwnershipMode};
78 std::vector<std::pair<size_t, size_t>> param = {{mergersPublicationInterval, 1}};
79 mergerConfig.publicationDecision = {mergersPublicationDecision, param};
80 mergerConfig.mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
81 mergerConfig.topologySize = {TopologySize::NumberOfLayers, mergersLayers};
82 mergersBuilder.setConfig(mergerConfig);
83
84 mergersBuilder.generateInfrastructure(specs);
85
86 auto printHisto = [](const TH1* histo) {
87 if (histo) {
88 std::string bins = "BINS:";
89 for (int i = 1; i <= histo->GetNbinsX(); i++) {
90 bins += " " + std::to_string((int)histo->GetBinContent(i));
91 if (i >= 100) {
92 LOG(info) << "Trimming the output to 100 entries, total is: " << histo->GetNbinsX();
93 break;
94 }
95 }
96 LOG(info) << bins;
97 } else {
98 LOG(info) << "they asked me to print a nullptr";
99 }
100 };
101
102 // clang-format off
103 DataProcessorSpec printer{
104 "printer-bins",
105 Inputs{
106 { "histo", "TST", "FULLHISTO", 0 }
107 },
108 Outputs{},
111 return (AlgorithmSpec::ProcessCallback) [&](ProcessingContext& processingContext) mutable {
112 auto ref = processingContext.inputs().get<DataRef>("histo");
113 auto tobject = DataRefUtils::as<TObject>(ref);
114 if (auto histo = dynamic_cast<const TH1F*>(tobject.get())) {
115 printHisto(histo);
116 } else if (auto collection = dynamic_cast<TCollection*>(tobject.get())) {
117 LOG(info) << "Received a collection, printing the first and the last histogram, total is: " << std::to_string(collection->GetEntries());
118 printHisto(dynamic_cast<TH1*>(collection->begin()()));
119 printHisto(dynamic_cast<TH1*>(collection->FindObject(std::to_string(collection->GetEntries() - 1).c_str())));
120 collection->SetOwner(true);
121 }
122 };
123 }
124 }
125 };
126 specs.push_back(printer);
127
128
129 return specs;
130}
131// clang-format on
const auto bins
Definition PID.cxx:49
int32_t i
Definition of O2 MergerInfrastructureBuilder, v0.1.
ConfigParamRegistry & options() const
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &inputs)
GLenum GLfloat param
Definition glcorearb.h:271
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
A label that can be associated to a DataProcessorSpec.
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"