Project
Loading...
Searching...
No Matches
ChunkedDigitPublisher.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
18#include "Framework/Task.h"
31#include "TPCBase/Sector.h"
32#include <TFile.h>
33#include <TTree.h>
34#include <TBranch.h>
35#include <stdexcept>
36#include <string>
37#include <vector>
38#include <utility>
39#include <numeric>
40#include <TROOT.h>
41#ifdef NDEBUG
42#undef NDEBUG
43#endif
44#include <cassert>
45#ifdef WITH_OPENMP
46#include <omp.h>
47#endif
48#include <TStopwatch.h>
49
51
52using namespace o2::framework;
53using namespace o2::header;
54
55void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
56{
58}
59
60// we need to add workflow options before including Framework/runDataProcessing
61void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
62{
63 // for the TPC it is useful to take at most half of the available (logical) cores due to memory requirements
64 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
65 std::string laneshelp("Number of tpc processing lanes. A lane is a pipeline of algorithms.");
66 workflowOptions.push_back(
67 ConfigParamSpec{"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
68
69 std::string sectorshelp("List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
70 std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
71 workflowOptions.push_back(
72 ConfigParamSpec{"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
73
74 // option to disable MC truth
75 workflowOptions.push_back(ConfigParamSpec{"disable-mc", o2::framework::VariantType::Bool, false, {"disable mc-truth"}});
76 workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings ..."}});
78}
79
80// ------------------------------------------------------------------
81
84
86
87namespace o2
88{
89namespace tpc
90{
91
92template <typename T, typename R>
93void copyHelper(T const& origin, R& target)
94{
95 // Using critical section here as this is writing to shared mem
96 // and not sure if boost shared mem allocator is thread-safe.
97 // It was crashing without this.
98#pragma omp critical
99 std::copy(origin.begin(), origin.end(), std::back_inserter(target));
100}
101template <>
103{
104 target.mergeAtBack(origin);
105}
106
107template <typename T>
108auto makePublishBuffer(framework::ProcessingContext& pc, int sector, uint64_t activeSectors)
109{
110 LOG(info) << "PUBLISHING SECTOR " << sector;
111
112 o2::tpc::TPCSectorHeader header{sector};
113 header.activeSectors = activeSectors;
114 return &pc.outputs().make<T>(Output{"TPC", "DIGITS", static_cast<SubSpecificationType>(sector), header});
115}
116
117template <>
118auto makePublishBuffer<MCTruthContainer>(framework::ProcessingContext& pc, int sector, uint64_t activeSectors)
119{
120 return new MCTruthContainer();
121}
122
123template <typename T>
124void publishBuffer(framework::ProcessingContext& pc, int sector, uint64_t activeSectors, T* accum)
125{
126 // nothing by default
127}
128
129template <>
130void publishBuffer<MCTruthContainer>(framework::ProcessingContext& pc, int sector, uint64_t activeSectors, MCTruthContainer* accum)
131{
132
133 LOG(info) << "PUBLISHING MC LABELS " << accum->getNElements();
134 o2::tpc::TPCSectorHeader header{sector};
135 header.activeSectors = activeSectors;
136 using LabelType = std::decay_t<decltype(pc.outputs().make<o2::dataformats::ConstMCTruthContainer<o2::MCCompLabel>>(Output{"", "", 0}))>;
137 LabelType* sharedlabels;
138#pragma omp critical
140 Output{"TPC", "DIGITSMCTR", static_cast<SubSpecificationType>(sector), header});
141
142 accum->flatten_to(*sharedlabels);
143 delete accum;
144}
145
146template <typename T>
147void mergeHelper(const char* brprefix, std::vector<int> const& tpcsectors, uint64_t activeSectors,
148 TFile& originfile, framework::ProcessingContext& pc)
149{
150 auto keyslist = originfile.GetListOfKeys();
151 for (int i = 0; i < keyslist->GetEntries(); ++i) {
152 auto key = keyslist->At(i);
153 int sector = atoi(key->GetName());
154 if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
155 // do nothing if sector not wanted
156 continue;
157 }
158
159 auto oldtree = (TTree*)originfile.Get(key->GetName());
160 assert(oldtree);
161 std::stringstream digitbrname;
162 digitbrname << brprefix << key->GetName();
163 auto br = oldtree->GetBranch(digitbrname.str().c_str());
164 if (!br) {
165 continue;
166 }
167 T* chunk = nullptr;
168 br->SetAddress(&chunk);
169
170 using AccumType = std::decay_t<decltype(makePublishBuffer<T>(pc, sector, activeSectors))>;
171 AccumType accum;
172#pragma omp critical
173 accum = makePublishBuffer<T>(pc, sector, activeSectors);
174
175 for (auto e = 0; e < br->GetEntries(); ++e) {
176 br->GetEntry(e);
177 copyHelper(*chunk, *accum);
178 delete chunk;
179 chunk = nullptr;
180 }
181 br->ResetAddress();
182 br->DropBaskets("all");
183 delete oldtree;
184
185 // some data (labels are published slightly differently)
186 publishBuffer(pc, sector, activeSectors, accum);
187 }
188}
189
190void publishMergedTimeframes(std::vector<int> const& lanes, std::vector<int> const& tpcsectors, bool domctruth, framework::ProcessingContext& pc)
191{
192 uint64_t activeSectors = 0;
193 for (auto s : tpcsectors) {
194 activeSectors |= (uint64_t)0x1 << s;
195 }
196
197 ROOT::EnableThreadSafety();
198 // we determine the exact input list of files
199 auto digitfilelist = o2::utils::listFiles("tpc_driftime_digits_lane.*.root$");
200#ifdef WITH_OPENMP
201 omp_set_num_threads(std::min(lanes.size(), digitfilelist.size()));
202 LOG(info) << "Running digit publisher with OpenMP enabled";
203#pragma omp parallel for schedule(dynamic)
204#endif
205 for (size_t fi = 0; fi < digitfilelist.size(); ++fi) {
206 auto& filename = digitfilelist[fi];
207 LOG(debug) << "MERGING CHUNKED DIGITS FROM FILE " << filename;
208 auto originfile = new TFile(filename.c_str(), "OPEN");
209 assert(originfile);
210
211 //data definitions
212 using DigitsType = std::vector<o2::tpc::Digit>;
214 mergeHelper<DigitsType>("TPCDigit_", tpcsectors, activeSectors, *originfile, pc);
215 if (domctruth) {
216 mergeHelper<LabelType>("TPCDigitMCTruth_", tpcsectors, activeSectors, *originfile, pc);
217 }
218 originfile->Close();
219 delete originfile;
220 }
221}
222
223class Task
224{
225 public:
226 Task(std::vector<int> laneConfig, std::vector<int> tpcsectors, bool mctruth) : mLanes(laneConfig), mTPCSectors(tpcsectors), mDoMCTruth(mctruth)
227 {
228 }
229
231 {
232 LOG(info) << "Preparing digits (from digit chunks) for reconstruction";
233
234 TStopwatch w;
235 w.Start();
236 publishMergedTimeframes(mLanes, mTPCSectors, mDoMCTruth, pc);
237
238 pc.services().get<ControlService>().endOfStream();
239 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
240
241 LOG(info) << "DIGIT PUBLISHING TOOK " << w.RealTime();
242 return;
243 }
244
246 {
247 }
248
249 private:
250 bool mDoMCTruth = true;
251 std::vector<int> mLanes;
252 std::vector<int> mTPCSectors;
253};
254
258DataProcessorSpec getSpec(std::vector<int> const& laneConfiguration, std::vector<int> const& tpcsectors, bool mctruth, bool publish = true)
259{
260 //data definitions
261 using DigitsOutputType = std::vector<o2::tpc::Digit>;
262 using CommonModeOutputType = std::vector<o2::tpc::CommonMode>;
263
264 std::vector<OutputSpec> outputs; // define channel by triple of (origin, type id of data to be sent on this channel, subspecification)
265 if (publish) {
266 // effectively the input expects one sector per subspecification
267 for (int s = 0; s < 36; ++s) {
268 OutputLabel binding{std::to_string(s)};
269 outputs.emplace_back(/*binding,*/ "TPC", "DIGITS", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
270 if (mctruth) {
271 outputs.emplace_back(/*binding,*/ "TPC", "DIGITSMCTR", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
272 }
273 }
274 }
275
276 return DataProcessorSpec{
277 "TPCDigitMerger", {}, outputs, AlgorithmSpec{o2::framework::adaptFromTask<Task>(laneConfiguration, tpcsectors, mctruth)}, Options{}};
278}
279
280} // end namespace tpc
281} // end namespace o2
282
284{
285 WorkflowSpec specs;
286 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
287
288 auto numlanes = configcontext.options().get<int>("tpc-lanes");
289 bool mctruth = !configcontext.options().get<bool>("disable-mc");
290 auto tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.options().get<std::string>("tpc-sectors"));
291
292 std::vector<int> lanes(numlanes);
293 std::iota(lanes.begin(), lanes.end(), 0);
294 specs.emplace_back(o2::tpc::getSpec(lanes, tpcsectors, mctruth));
295
296 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
297 o2::raw::HBFUtilsInitializer hbfIni(configcontext, specs);
298 return specs;
299}
Definition of the Detector class.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
o2::dataformats::MCTruthContainer< o2::MCCompLabel > MCTruthContainer
Definition of the common mode container class.
A const (ready only) version of MCTruthContainer.
Definition of the TPC Digit.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
int32_t i
bool publish(std::string const &filename, std::string const &path, std::string CCDBpath)
Definition GRPTool.cxx:198
Definition of a container to keep Monte Carlo truth external to simulation objects.
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
StringRef key
static void updateFromString(std::string const &)
A read-only version of MCTruthContainer allowing for storage optimisation.
size_t flatten_to(ContainerType &container) const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr int MAXSECTOR
Definition Sector.h:44
void init(framework::InitContext &ctx)
Task(std::vector< int > laneConfig, std::vector< int > tpcsectors, bool mctruth)
void run(framework::ProcessingContext &pc)
GLenum target
Definition glcorearb.h:1641
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< ConfigParamSpec > Options
header::DataHeader::SubSpecificationType SubSpecificationType
O2 data header classes and API, v0.1.
Definition DetID.h:49
DataProcessorSpec getSpec(std::vector< int > const &laneConfiguration, std::vector< int > const &tpcsectors, bool mctruth, bool publish=true)
void mergeHelper(const char *brprefix, std::vector< int > const &tpcsectors, uint64_t activeSectors, TFile &originfile, framework::ProcessingContext &pc)
auto makePublishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void publishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, T *accum)
void publishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, MCTruthContainer *accum)
void publishMergedTimeframes(std::vector< int > const &lanes, std::vector< int > const &tpcsectors, bool domctruth, framework::ProcessingContext &pc)
auto makePublishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void copyHelper< MCTruthContainer >(MCTruthContainer const &origin, MCTruthContainer &target)
void copyHelper(T const &origin, R &target)
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"