Project
Loading...
Searching...
No Matches
tpc-occupancy-filter.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <memory>
13#include <vector>
14#include <string>
15#include <unordered_map>
16#include <cstdint>
17
28
32#include "TPCBase/Sector.h"
36
37using namespace o2::framework;
38using namespace o2::tpc;
39
40// Global variable used to transport data to the completion policy
41static std::vector<InputSpec> gPolicyData;
42static uint64_t gTpcSectorMask = 0xFFFFFFFFF;
43
44// customize the completion policy
45void customize(std::vector<o2::framework::CompletionPolicy>& policies)
46{
48 // policies.push_back(CompletionPolicyHelpers::defineByName("tpc-krypton-raw-filter.*", CompletionPolicy::CompletionOp::Consume));
49 // policies.push_back(CompletionPolicyHelpers::defineByName("file-writer", CompletionPolicy::CompletionOp::Consume));
50
51 // we customize the pipeline processors to consume data as it comes
52 //
53 // the custom completion policy for the tracker
55}
56
57// we need to add workflow options before including Framework/runDataProcessing
58void customize(std::vector<ConfigParamSpec>& workflowOptions)
59{
60 std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
61 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
62
63 std::vector<ConfigParamSpec> options{
64 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
65 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
66 {"outputFile", VariantType::String, "./occupancy-filtered-digits.root", {"output file name for the filtered krypton file"}},
67 {"lanes", VariantType::Int, defaultlanes, {"Number of parallel processing lanes."}},
68 {"sectors", VariantType::String, sectorDefault.c_str(), {"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15"}},
69 {"writer-type", VariantType::String, "local", {"Writer type (local, EPN, none)"}},
70 {"ccdb-path", VariantType::String, "http://ccdb-test.cern.ch:8080", {"Path to CCDB"}},
71 };
72
73 std::swap(workflowOptions, options);
74}
75
77
78template <typename T>
80
81enum class WriterType {
82 Local,
83 EPN,
84 None,
85};
86
87const std::unordered_map<std::string, WriterType> WriterMap{
88 {"local", WriterType::Local},
89 {"EPN", WriterType::EPN},
90 {"none", WriterType::None},
91};
92
94{
95
96 using namespace o2::tpc;
97
98 // set up configuration
99 o2::conf::ConfigurableParam::updateFromFile(config.options().get<std::string>("configFile"));
100 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
101 o2::conf::ConfigurableParam::writeINI("o2tpccalibration_configuration.ini");
102
103 const std::string outputFile = config.options().get<std::string>("outputFile");
104
105 const auto tpcSectors = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("sectors"));
106 const auto nSectors = (int)tpcSectors.size();
107 const auto nLanes = std::min(config.options().get<int>("lanes"), nSectors);
108
109 WriterType writerType;
110 try {
111 writerType = WriterMap.at(config.options().get<std::string>("writer-type"));
112 } catch (std::out_of_range&) {
113 throw std::invalid_argument(std::string("invalid writer-type type: ") + config.options().get<std::string>("writer-type"));
114 }
115
116 WorkflowSpec workflow;
117
118 if (nLanes <= 0) {
119 return workflow;
120 }
121
122 std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
123
124 gTpcSectorMask = 0;
125 for (auto s : tpcSectors) {
126 gTpcSectorMask |= (1ul << s);
127 }
128 gPolicyData.emplace_back(o2::framework::InputSpec{"data", o2::framework::ConcreteDataTypeMatcher{"TPC", "FILTERDIG"}});
129
130 WorkflowSpec parallelProcessors;
131 parallelProcessors.emplace_back(getOccupancyFilterSpec());
132
133 parallelProcessors = parallelPipeline(
134 parallelProcessors, nLanes,
135 [&laneConfiguration]() { return laneConfiguration.size(); },
136 [&laneConfiguration](size_t index) { return laneConfiguration[index]; });
137 workflow.insert(workflow.end(), parallelProcessors.begin(), parallelProcessors.end());
138
139 if (writerType == WriterType::Local) {
141 //
142 // generation of processor specs for various types of outputs
143 // based on generic RootTreeWriter and MakeRootTreeWriterSpec generator
144 //
145 // -------------------------------------------------------------------------------------------
146 // the callbacks for the RootTreeWriter
147 //
148 // The generic writer needs a way to associate incoming data with the individual branches for
149 // the TPC sectors. The sector number is transmitted as part of the sector header, the callback
150 // finds the corresponding index in the vector of configured sectors
151 auto getIndex = [tpcSectors](o2::framework::DataRef const& ref) {
152 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
153 if (!tpcSectorHeader) {
154 throw std::runtime_error("TPC sector header missing in header stack");
155 }
156 if (tpcSectorHeader->sector() < 0) {
157 // special data sets, don't write
158 return ~(size_t)0;
159 }
160 size_t index = 0;
161 for (auto const& sector : tpcSectors) {
162 if (sector == tpcSectorHeader->sector()) {
163 return index;
164 }
165 ++index;
166 }
167 throw std::runtime_error("sector " + std::to_string(tpcSectorHeader->sector()) + " not configured for writing");
168 };
169 auto getName = [tpcSectors](std::string base, size_t index) {
170 return base + "_" + std::to_string(tpcSectors.at(index));
171 };
172
173 auto makeWriterSpec = [tpcSectors, laneConfiguration, getIndex, getName](const char* processName,
174 const char* defaultFileName,
175 const char* defaultTreeName,
176 auto&& databranch,
177 bool singleBranch = false) {
178 if (tpcSectors.size() == 0) {
179 throw std::invalid_argument(std::string("writer process configuration needs list of TPC sectors"));
180 }
181
182 auto amendInput = [tpcSectors, laneConfiguration](InputSpec& input, size_t index) {
183 input.binding += std::to_string(laneConfiguration[index]);
184 DataSpecUtils::updateMatchingSubspec(input, laneConfiguration[index]);
185 };
186 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex, getName, singleBranch](auto&& def, bool enableMC = true) {
187 if (!singleBranch) {
188 def.keys = mergeInputs(def.keys, laneConfiguration.size(), amendInput);
189 // the branch is disabled if set to 0
190 def.nofBranches = enableMC ? tpcSectors.size() : 0;
191 def.getIndex = getIndex;
192 def.getName = getName;
193 } else {
194 // instead of the separate sector branches only one is going to be written
195 def.nofBranches = enableMC ? 1 : 0;
196 }
197 return std::move(def);
198 };
199
200 return std::move(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
201 std::move(amendBranchDef(databranch)))());
202 };
203
205 //
206 // a writer process for digits
207 //
208 // selected by output type 'difits'
209 using OutputType = std::vector<o2::tpc::Digit>;
210 workflow.push_back(makeWriterSpec("tpc-occupancy-writer",
211 outputFile.data(),
212 "o2sim",
213 BranchDefinition<OutputType>{InputSpec{"data", "TPC", "FILTERDIG", 0},
214 "TPCDigit",
215 "digit-branch-name"}));
216 } else if (writerType == WriterType::EPN) {
217 workflow.push_back(getFileWriterSpec<Digit>("data:TPC/FILTERDIG", BranchType::Digits));
218 }
219
220 return workflow;
221}
std::string getName(const TDataMember *dm, int index, int size)
Definition of the TPC Digit.
Writer for calibration data.
Configurable generator for RootTreeWriter processor spec.
Processor spec for filtering krypton raw data.
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
Generate a processor spec for the RootTreeWriter utility.
static constexpr int MAXSECTOR
Definition Sector.h:44
GLuint index
Definition glcorearb.h:781
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
Definition SimTraits.h:167
o2::framework::DataProcessorSpec getOccupancyFilterSpec()
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
Marks an empty item in the context.
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask
const std::unordered_map< std::string, WriterType > WriterMap
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)