Project
Loading...
Searching...
No Matches
tpc-krypton-raw-filter.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <memory>
13#include <vector>
14#include <string>
15#include <unordered_map>
16
27
31#include "TPCBase/Sector.h"
35#include "TPCBase/CalDet.h"
36
37using namespace o2::framework;
38using namespace o2::tpc;
39
40// Global variable used to transport data to the completion policy
41std::vector<InputSpec> gPolicyData;
42unsigned long gTpcSectorMask = 0xFFFFFFFFF;
43
44// we need to add workflow options before including Framework/runDataProcessing
45void customize(std::vector<ConfigParamSpec>& workflowOptions)
46{
47 std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
48 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
49
50 std::vector<ConfigParamSpec> options{
51 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
52 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
53 {"outputFile", VariantType::String, "./filtered-krypton-raw.root", {"output file name for the filtered krypton file"}},
54 {"lanes", VariantType::Int, defaultlanes, {"Number of parallel processing lanes."}},
55 {"sectors", VariantType::String, sectorDefault.c_str(), {"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15"}},
56 {"writer-type", VariantType::String, "local", {"Writer type (local, EPN, none)"}},
57 {"ccdb-path", VariantType::String, "http://ccdb-test.cern.ch:8080", {"Path to CCDB"}},
58 };
59
60 std::swap(workflowOptions, options);
61}
62
64
65template <typename T>
67
68enum class WriterType {
69 Local,
70 EPN,
71 None,
72};
73
74const std::unordered_map<std::string, WriterType> WriterMap{
75 {"local", WriterType::Local},
76 {"EPN", WriterType::EPN},
77 {"none", WriterType::None},
78};
79
81{
82
83 using namespace o2::tpc;
84
85 // set up configuration
86 o2::conf::ConfigurableParam::updateFromFile(config.options().get<std::string>("configFile"));
87 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
88 o2::conf::ConfigurableParam::writeINI("o2tpccalibration_configuration.ini");
89
90 const std::string outputFile = config.options().get<std::string>("outputFile");
91
92 const auto tpcSectors = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("sectors"));
93 const auto nSectors = (int)tpcSectors.size();
94 const auto nLanes = std::min(config.options().get<int>("lanes"), nSectors);
95
96 WriterType writerType;
97 try {
98 writerType = WriterMap.at(config.options().get<std::string>("writer-type"));
99 } catch (std::out_of_range&) {
100 throw std::invalid_argument(std::string("invalid writer-type type: ") + config.options().get<std::string>("writer-type"));
101 }
102
103 WorkflowSpec workflow;
104
105 if (nLanes <= 0) {
106 return workflow;
107 }
108
109 std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
110
111 gTpcSectorMask = 0;
112 for (auto s : tpcSectors) {
113 gTpcSectorMask |= (1ul << s);
114 }
115 gPolicyData.emplace_back(o2::framework::InputSpec{"data", o2::framework::ConcreteDataTypeMatcher{"TPC", "FILTERDIG"}});
116
117 CalPad* noise = nullptr;
118 const std::string ccdbPath = config.options().get<std::string>("ccdb-path");
120 cdb.setURL(ccdbPath);
121 if (cdb.isHostReachable()) {
122 noise = cdb.get<CalPad>("TPC/Calib/Noise");
123 if (noise) {
124 LOGP(info, "Loaded noise from {}", ccdbPath);
125 } else {
126 LOGP(error, "Could not load noise from {}", ccdbPath);
127 }
128 } else {
129 LOGP(error, "Could not load noise from {}, host is not reachable", ccdbPath);
130 }
131
132 WorkflowSpec parallelProcessors;
133 parallelProcessors.emplace_back(getKryptonRawFilterSpec(noise));
134
135 parallelProcessors = parallelPipeline(
136 parallelProcessors, nLanes,
137 [&laneConfiguration]() { return laneConfiguration.size(); },
138 [&laneConfiguration](size_t index) { return laneConfiguration[index]; });
139 workflow.insert(workflow.end(), parallelProcessors.begin(), parallelProcessors.end());
140
141 if (writerType == WriterType::Local) {
143 //
144 // generation of processor specs for various types of outputs
145 // based on generic RootTreeWriter and MakeRootTreeWriterSpec generator
146 //
147 // -------------------------------------------------------------------------------------------
148 // the callbacks for the RootTreeWriter
149 //
150 // The generic writer needs a way to associate incoming data with the individual branches for
151 // the TPC sectors. The sector number is transmitted as part of the sector header, the callback
152 // finds the corresponding index in the vector of configured sectors
153 auto getIndex = [tpcSectors](o2::framework::DataRef const& ref) {
154 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
155 if (!tpcSectorHeader) {
156 throw std::runtime_error("TPC sector header missing in header stack");
157 }
158 if (tpcSectorHeader->sector() < 0) {
159 // special data sets, don't write
160 return ~(size_t)0;
161 }
162 size_t index = 0;
163 for (auto const& sector : tpcSectors) {
164 if (sector == tpcSectorHeader->sector()) {
165 return index;
166 }
167 ++index;
168 }
169 throw std::runtime_error("sector " + std::to_string(tpcSectorHeader->sector()) + " not configured for writing");
170 };
171 auto getName = [tpcSectors](std::string base, size_t index) {
172 return base + "_" + std::to_string(tpcSectors.at(index));
173 };
174
175 auto makeWriterSpec = [tpcSectors, laneConfiguration, getIndex, getName](const char* processName,
176 const char* defaultFileName,
177 const char* defaultTreeName,
178 auto&& databranch,
179 bool singleBranch = false) {
180 if (tpcSectors.size() == 0) {
181 throw std::invalid_argument(std::string("writer process configuration needs list of TPC sectors"));
182 }
183
184 auto amendInput = [tpcSectors, laneConfiguration](InputSpec& input, size_t index) {
185 input.binding += std::to_string(laneConfiguration[index]);
186 DataSpecUtils::updateMatchingSubspec(input, laneConfiguration[index]);
187 };
188 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex, getName, singleBranch](auto&& def, bool enableMC = true) {
189 if (!singleBranch) {
190 def.keys = mergeInputs(def.keys, laneConfiguration.size(), amendInput);
191 // the branch is disabled if set to 0
192 def.nofBranches = enableMC ? tpcSectors.size() : 0;
193 def.getIndex = getIndex;
194 def.getName = getName;
195 } else {
196 // instead of the separate sector branches only one is going to be written
197 def.nofBranches = enableMC ? 1 : 0;
198 }
199 return std::move(def);
200 };
201
202 return std::move(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
203 std::move(amendBranchDef(databranch)))());
204 };
205
207 //
208 // a writer process for digits
209 //
210 // selected by output type 'difits'
211 using OutputType = std::vector<o2::tpc::Digit>;
212 workflow.push_back(makeWriterSpec("tpc-raw-krypton-writer",
213 outputFile.data(),
214 "o2sim",
215 BranchDefinition<OutputType>{InputSpec{"data", "TPC", "FILTERDIG", 0},
216 "TPCDigit",
217 "digit-branch-name"}));
218 } else if (writerType == WriterType::EPN) {
219 workflow.push_back(getFileWriterSpec<Digit>("data:TPC/FILTERDIG", BranchType::Digits));
220 }
221
222 return workflow;
223}
std::string getName(const TDataMember *dm, int index, int size)
Definition of the TPC Digit.
Writer for calibration data.
Processor spec for filtering krypton raw data.
std::string ccdbPath(const std::string badChannelType)
Configurable generator for RootTreeWriter processor spec.
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
static BasicCCDBManager & instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
Generate a processor spec for the RootTreeWriter utility.
static constexpr int MAXSECTOR
Definition Sector.h:44
GLuint index
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
Definition SimTraits.h:167
o2::framework::DataProcessorSpec getKryptonRawFilterSpec(CalPad *noise=nullptr)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
Marks an empty item in the context.
const std::unordered_map< std::string, WriterType > WriterMap
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)