Project
Loading...
Searching...
No Matches
tpc-krypton-clusterer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <vector>
13#include <string>
14#include <unordered_map>
15#include <thread>
16
27
30#include "TPCBase/Sector.h"
34
35using namespace o2::framework;
36using namespace o2::tpc;
37
38// Global variable used to transport data to the completion policy
39std::vector<InputSpec> gPolicyData;
40unsigned long gTpcSectorMask = 0xFFFFFFFFF;
41
42// we need to add workflow options before including Framework/runDataProcessing
43void customize(std::vector<ConfigParamSpec>& workflowOptions)
44{
45 std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
46 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
47
48 std::vector<ConfigParamSpec> options{
49 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
50 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
51 {"outputFile", VariantType::String, "./tpcBoxClusters.root", {"output file name for the box cluster root file"}},
52 {"lanes", VariantType::Int, defaultlanes, {"Number of parallel processing lanes."}},
53 {"sectors", VariantType::String, sectorDefault.c_str(), {"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15"}},
54 {"writer-type", VariantType::String, "local", {"Writer type (local, EPN, none)"}},
55 };
56
57 std::swap(workflowOptions, options);
58}
59
61
62template <typename T>
64
65enum class WriterType {
66 Local,
67 EPN,
68 None,
69};
70
71const std::unordered_map<std::string, WriterType> WriterMap{
72 {"local", WriterType::Local},
73 {"EPN", WriterType::EPN},
74 {"none", WriterType::None},
75};
76
78{
79
80 using namespace o2::tpc;
81
82 // set up configuration
83 o2::conf::ConfigurableParam::updateFromFile(config.options().get<std::string>("configFile"));
84 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
85 o2::conf::ConfigurableParam::writeINI("o2tpccalibration_configuration.ini");
86
87 const std::string outputFile = config.options().get<std::string>("outputFile");
88
89 const auto tpcSectors = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("sectors"));
90 const auto nSectors = (int)tpcSectors.size();
91 const auto nLanes = std::min(config.options().get<int>("lanes"), nSectors);
92
93 WriterType writerType;
94 try {
95 writerType = WriterMap.at(config.options().get<std::string>("writer-type"));
96 } catch (std::out_of_range&) {
97 throw std::invalid_argument(std::string("invalid writer-type type: ") + config.options().get<std::string>("writer-type"));
98 }
99
100 WorkflowSpec workflow;
101
102 if (nLanes <= 0) {
103 return workflow;
104 }
105
106 std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
107
108 gTpcSectorMask = 0;
109 for (auto s : tpcSectors) {
110 gTpcSectorMask |= (1ul << s);
111 }
112 gPolicyData.emplace_back(o2::framework::InputSpec{"data", o2::framework::ConcreteDataTypeMatcher{"TPC", "KRCLUSTERS"}});
113
114 WorkflowSpec parallelProcessors;
115 parallelProcessors.emplace_back(getKryptonClustererSpec());
116
117 parallelProcessors = parallelPipeline(
118 parallelProcessors, nLanes,
119 [&laneConfiguration]() { return laneConfiguration.size(); },
120 [&laneConfiguration](size_t index) { return laneConfiguration[index]; });
121 workflow.insert(workflow.end(), parallelProcessors.begin(), parallelProcessors.end());
122
123 if (writerType == WriterType::Local) {
125 //
126 // generation of processor specs for various types of outputs
127 // based on generic RootTreeWriter and MakeRootTreeWriterSpec generator
128 //
129 // -------------------------------------------------------------------------------------------
130 // the callbacks for the RootTreeWriter
131 //
132 // The generic writer needs a way to associate incoming data with the individual branches for
133 // the TPC sectors. The sector number is transmitted as part of the sector header, the callback
134 // finds the corresponding index in the vector of configured sectors
135 auto getIndex = [tpcSectors](o2::framework::DataRef const& ref) {
136 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
137 if (!tpcSectorHeader) {
138 throw std::runtime_error("TPC sector header missing in header stack");
139 }
140 if (tpcSectorHeader->sector() < 0) {
141 // special data sets, don't write
142 return ~(size_t)0;
143 }
144 size_t index = 0;
145 for (auto const& sector : tpcSectors) {
146 if (sector == tpcSectorHeader->sector()) {
147 return index;
148 }
149 ++index;
150 }
151 throw std::runtime_error("sector " + std::to_string(tpcSectorHeader->sector()) + " not configured for writing");
152 };
153 auto getName = [tpcSectors](std::string base, size_t index) {
154 return base + "_" + std::to_string(tpcSectors.at(index));
155 };
156
157 auto makeWriterSpec = [tpcSectors, laneConfiguration, getIndex, getName](const char* processName,
158 const char* defaultFileName,
159 const char* defaultTreeName,
160 auto&& databranch,
161 bool singleBranch = false) {
162 if (tpcSectors.size() == 0) {
163 throw std::invalid_argument(std::string("writer process configuration needs list of TPC sectors"));
164 }
165
166 auto amendInput = [tpcSectors, laneConfiguration](InputSpec& input, size_t index) {
167 input.binding += std::to_string(laneConfiguration[index]);
168 DataSpecUtils::updateMatchingSubspec(input, laneConfiguration[index]);
169 };
170 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex, getName, singleBranch](auto&& def, bool enableMC = true) {
171 if (!singleBranch) {
172 def.keys = mergeInputs(def.keys, laneConfiguration.size(), amendInput);
173 // the branch is disabled if set to 0
174 def.nofBranches = enableMC ? tpcSectors.size() : 0;
175 def.getIndex = getIndex;
176 def.getName = getName;
177 } else {
178 // instead of the separate sector branches only one is going to be written
179 def.nofBranches = enableMC ? 1 : 0;
180 }
181 return std::move(def);
182 };
183
184 return std::move(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
185 std::move(amendBranchDef(databranch)))());
186 };
187
189 //
190 // a writer process for digits
191 //
192 // selected by output type 'difits'
193 using KrClusterOutputType = std::vector<o2::tpc::KrCluster>;
194 workflow.push_back(makeWriterSpec("tpc-krcluster-writer",
195 outputFile.data(),
196 "Clusters",
197 BranchDefinition<KrClusterOutputType>{InputSpec{"data", "TPC", "KRCLUSTERS", 0},
198 "TPCBoxCluster",
199 "boxcluster-branch-name"}));
200 } else if (writerType == WriterType::EPN) {
201 workflow.push_back(getFileWriterSpec<KrCluster>("data:TPC/KRCLUSTERS", BranchType::Krypton));
202 }
203
204 return workflow;
205}
std::string getName(const TDataMember *dm, int index, int size)
Writer for calibration data.
Processor spec for running TPC Krypton cluster finder.
Configurable generator for RootTreeWriter processor spec.
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
Struct for Krypton and X-ray clusters.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
Generate a processor spec for the RootTreeWriter utility.
static constexpr int MAXSECTOR
Definition Sector.h:44
GLuint index
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
Definition SimTraits.h:167
o2::framework::DataProcessorSpec getKryptonClustererSpec()
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
Marks an empty item in the context.
const std::unordered_map< std::string, WriterType > WriterMap
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)