Project
Loading...
Searching...
No Matches
ChunkedDigitPublisher.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
18#include "Framework/Task.h"
32#include "TPCBase/Sector.h"
33#include <TFile.h>
34#include <TTree.h>
35#include <TBranch.h>
36#include <stdexcept>
37#include <string>
38#include <vector>
39#include <utility>
40#include <numeric>
41#include <TROOT.h>
42#ifdef NDEBUG
43#undef NDEBUG
44#endif
45#include <cassert>
46#ifdef WITH_OPENMP
47#include <omp.h>
48#endif
49#include <TStopwatch.h>
51
53
55
56using namespace o2::framework;
57using namespace o2::header;
58
59void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
60{
62}
63
64// we need to add workflow options before including Framework/runDataProcessing
65void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
66{
67 // for the TPC it is useful to take at most half of the available (logical) cores due to memory requirements
68 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
69 std::string laneshelp("Number of tpc processing lanes. A lane is a pipeline of algorithms.");
70 workflowOptions.push_back(
71 ConfigParamSpec{"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
72
73 std::string sectorshelp("List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
74 std::string sectorDefault = "0-" + std::to_string(o2::tpc::Sector::MAXSECTOR - 1);
75 workflowOptions.push_back(
76 ConfigParamSpec{"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
77
78 // option to write merged data to file
79 workflowOptions.push_back(ConfigParamSpec{"writer-mode", o2::framework::VariantType::Bool, false, {"enable ROOT file output"}});
80
81 // option to disable MC truth
82 workflowOptions.push_back(ConfigParamSpec{"disable-mc", o2::framework::VariantType::Bool, false, {"disable mc-truth"}});
83 workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings ..."}});
85}
86
87// ------------------------------------------------------------------
88
91
93
94namespace o2
95{
96namespace tpc
97{
98
99template <typename T, typename R>
100void copyHelper(T const& origin, R& target)
101{
102 // Using critical section here as this is writing to shared mem
103 // and not sure if boost shared mem allocator is thread-safe.
104 // It was crashing without this.
105#pragma omp critical
106 std::copy(origin.begin(), origin.end(), std::back_inserter(target));
107}
108template <>
110{
111 target.mergeAtBack(origin);
112}
113
114// a trait to map TPC data types to a DPL channel name
115template <typename T>
117template <>
118struct OutputChannelName<std::vector<o2::tpc::Digit>> {
119 static constexpr char value[] = "DIGITS";
120};
121template <>
122struct OutputChannelName<std::vector<o2::tpc::CommonMode>> {
123 static constexpr char value[] = "COMMONMODE";
124};
125template <>
126struct OutputChannelName<std::vector<DigiGroupRef>> {
127 static constexpr char value[] = "DIGTRIGGERS";
128};
129
130template <typename T>
131auto makePublishBuffer(framework::ProcessingContext& pc, int sector, uint64_t activeSectors)
132{
133 LOG(info) << "PUBLISHING SECTOR " << sector << " FOR CHANNEL " << OutputChannelName<T>::value;
134
135 o2::tpc::TPCSectorHeader header{sector};
136 header.activeSectors = activeSectors;
137 return &pc.outputs().make<T>(Output{"TPC", OutputChannelName<T>::value, static_cast<SubSpecificationType>(sector), header});
138}
139
140template <>
141auto makePublishBuffer<MCTruthContainer>(framework::ProcessingContext& pc, int sector, uint64_t activeSectors)
142{
143 return new MCTruthContainer();
144}
145
146template <typename T>
147void publishBuffer(framework::ProcessingContext& pc, int sector, uint64_t activeSectors, T* accum)
148{
149 // nothing by default
150}
151
152template <>
153void publishBuffer<MCTruthContainer>(framework::ProcessingContext& pc, int sector, uint64_t activeSectors, MCTruthContainer* accum)
154{
155
156 LOG(info) << "PUBLISHING MC LABELS " << accum->getNElements();
157 o2::tpc::TPCSectorHeader header{sector};
158 header.activeSectors = activeSectors;
159 using LabelType = std::decay_t<decltype(pc.outputs().make<o2::dataformats::ConstMCTruthContainer<o2::MCCompLabel>>(Output{"", "", 0}))>;
160 LabelType* sharedlabels;
161#pragma omp critical
163 Output{"TPC", "DIGITSMCTR", static_cast<SubSpecificationType>(sector), header});
164
165 accum->flatten_to(*sharedlabels);
166 delete accum;
167}
168
169template <typename T>
170void mergeHelper(const char* brprefix, std::vector<int> const& tpcsectors, uint64_t activeSectors,
171 TFile& originfile, framework::ProcessingContext& pc)
172{
173 auto keyslist = originfile.GetListOfKeys();
174 for (int i = 0; i < keyslist->GetEntries(); ++i) {
175 auto key = keyslist->At(i);
176 int sector = atoi(key->GetName());
177 if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
178 // do nothing if sector not wanted
179 continue;
180 }
181
182 auto oldtree = (TTree*)originfile.Get(key->GetName());
183 assert(oldtree);
184 std::stringstream digitbrname;
185 digitbrname << brprefix << key->GetName();
186 auto br = oldtree->GetBranch(digitbrname.str().c_str());
187 if (!br) {
188 continue;
189 }
190 T* chunk = nullptr;
191 br->SetAddress(&chunk);
192
193 using AccumType = std::decay_t<decltype(makePublishBuffer<T>(pc, sector, activeSectors))>;
194 AccumType accum;
195#pragma omp critical
196 accum = makePublishBuffer<T>(pc, sector, activeSectors);
197
198 for (auto e = 0; e < br->GetEntries(); ++e) {
199 br->GetEntry(e);
200 copyHelper(*chunk, *accum);
201 delete chunk;
202 chunk = nullptr;
203 }
204 br->ResetAddress();
205 br->DropBaskets("all");
206 delete oldtree;
207
208 // some data (labels are published slightly differently)
209 publishBuffer(pc, sector, activeSectors, accum);
210 }
211}
212
213template <>
214void mergeHelper<std::vector<DigiGroupRef>>(const char* brprefix, std::vector<int> const& tpcsectors, uint64_t activeSectors,
215 TFile& originfile, framework::ProcessingContext& pc)
216{
217 // specialization for TPC Trigger
218 auto keyslist = originfile.GetListOfKeys();
219 for (int i = 0; i < keyslist->GetEntries(); ++i) {
220 auto key = keyslist->At(i);
221 int sector = atoi(key->GetName());
222 if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
223 // do nothing if sector not wanted
224 continue;
225 }
226
227 using AccumType = std::decay_t<decltype(makePublishBuffer<std::vector<DigiGroupRef>>(pc, sector, activeSectors))>;
228 AccumType accum;
229#pragma omp critical
230 accum = makePublishBuffer<std::vector<DigiGroupRef>>(pc, sector, activeSectors);
231 // no actual data sent. Continuous mode.
232
233 publishBuffer(pc, sector, activeSectors, accum);
234 }
235}
236
237void publishMergedTimeframes(std::vector<int> const& lanes, std::vector<int> const& tpcsectors, bool domctruth, framework::ProcessingContext& pc)
238{
239 uint64_t activeSectors = 0;
240 for (auto s : tpcsectors) {
241 activeSectors |= (uint64_t)0x1 << s;
242 }
243
244 ROOT::EnableThreadSafety();
245 // we determine the exact input list of files
246 auto digitfilelist = o2::utils::listFiles("tpc_driftime_digits_lane.*.root$");
247#ifdef WITH_OPENMP
248 omp_set_num_threads(std::min(lanes.size(), digitfilelist.size()));
249 LOG(info) << "Running digit publisher with OpenMP enabled";
250#pragma omp parallel for schedule(dynamic)
251#endif
252 for (size_t fi = 0; fi < digitfilelist.size(); ++fi) {
253 auto& filename = digitfilelist[fi];
254 LOG(debug) << "MERGING CHUNKED DIGITS FROM FILE " << filename;
255 auto originfile = new TFile(filename.c_str(), "OPEN");
256 assert(originfile);
257
258 // data definitions
259 using DigitsType = std::vector<o2::tpc::Digit>;
261 mergeHelper<DigitsType>("TPCDigit_", tpcsectors, activeSectors, *originfile, pc);
262 if (domctruth) {
263 mergeHelper<LabelType>("TPCDigitMCTruth_", tpcsectors, activeSectors, *originfile, pc);
264 }
265
266 // we also merge common modes and publish a (fake) trigger entry
267 using CommonModeType = std::vector<o2::tpc::CommonMode>;
268 mergeHelper<CommonModeType>("TPCCommonMode_", tpcsectors, activeSectors, *originfile, pc);
269
270 using TriggerType = std::vector<DigiGroupRef>;
271 mergeHelper<TriggerType>("TPCCommonMode_", tpcsectors, activeSectors, *originfile, pc);
272
273 originfile->Close();
274 delete originfile;
275 }
276}
277
278class Task
279{
280 public:
281 Task(std::vector<int> laneConfig, std::vector<int> tpcsectors, bool mctruth) : mLanes(laneConfig), mTPCSectors(tpcsectors), mDoMCTruth(mctruth)
282 {
283 }
284
286 {
287 LOG(info) << "Preparing digits (from digit chunks) for reconstruction";
288
289 TStopwatch w;
290 w.Start();
291 publishMergedTimeframes(mLanes, mTPCSectors, mDoMCTruth, pc);
292
293 pc.services().get<ControlService>().endOfStream();
294 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
295
296 LOG(info) << "DIGIT PUBLISHING TOOK " << w.RealTime();
297 return;
298 }
299
301 {
302 }
303
304 private:
305 bool mDoMCTruth = true;
306 std::vector<int> mLanes;
307 std::vector<int> mTPCSectors;
308};
309
313DataProcessorSpec getSpec(std::vector<int> const& laneConfiguration, std::vector<int> const& tpcsectors, bool mctruth, bool publish = true)
314{
315 // data definitions
316 using DigitsOutputType = std::vector<o2::tpc::Digit>;
317 using CommonModeOutputType = std::vector<o2::tpc::CommonMode>;
318
319 std::vector<OutputSpec> outputs; // define channel by triple of (origin, type id of data to be sent on this channel, subspecification)
320 if (publish) {
321 // effectively the input expects one sector per subspecification
322 for (int s = 0; s < 36; ++s) {
323 OutputLabel binding{std::to_string(s)};
324 outputs.emplace_back("TPC", "DIGITS", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
325 if (mctruth) {
326 outputs.emplace_back("TPC", "DIGITSMCTR", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
327 }
328 // common mode
329 outputs.emplace_back("TPC", "COMMONMODE", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
330 // trigger records
331 outputs.emplace_back("TPC", "DIGTRIGGERS", static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
332 }
333 }
334
335 return DataProcessorSpec{
336 "TPCDigitMerger", {}, outputs, AlgorithmSpec{o2::framework::adaptFromTask<Task>(laneConfiguration, tpcsectors, mctruth)}, Options{}};
337}
338
339} // end namespace tpc
340} // end namespace o2
341
343{
344 WorkflowSpec specs;
345 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
346
347 auto numlanes = configcontext.options().get<int>("tpc-lanes");
348 bool mctruth = !configcontext.options().get<bool>("disable-mc");
349 bool writeout = configcontext.options().get<bool>("writer-mode");
350 auto tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.options().get<std::string>("tpc-sectors"));
351
352 std::vector<int> lanes(numlanes);
353 std::iota(lanes.begin(), lanes.end(), 0);
354 specs.emplace_back(o2::tpc::getSpec(lanes, tpcsectors, mctruth));
355
356 if (writeout) {
357 // for now writeout to a ROOT file only works if all sectors
358 // are included
359 if (tpcsectors.size() != 36) {
360 LOG(error) << "You currently need to include all TPC sectors in the ROOT writer-mode";
361 } else {
362 std::vector<int> writerlanes(tpcsectors.size());
363 std::iota(writerlanes.begin(), writerlanes.end(), 0);
364 specs.emplace_back(o2::tpc::getTPCDigitRootWriterSpec(writerlanes, mctruth));
365 }
366 }
367
368 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
369 o2::raw::HBFUtilsInitializer hbfIni(configcontext, specs);
370 return specs;
371}
Definition of the Detector class.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
o2::dataformats::MCTruthContainer< o2::MCCompLabel > MCTruthContainer
Definition of the common mode container class.
A const (ready only) version of MCTruthContainer.
Definition of the TPC Digit.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
int32_t i
bool publish(std::string const &filename, std::string const &path, std::string CCDBpath)
Definition GRPTool.cxx:198
Definition of a container to keep Monte Carlo truth external to simulation objects.
Class to refer to the 1st entry and N elements of some group in the continuous container.
Helper function to tokenize sequences and ranges of integral numbers.
std::ostringstream debug
StringRef key
static void updateFromString(std::string const &)
A read-only version of MCTruthContainer allowing for storage optimisation.
size_t flatten_to(ContainerType &container) const
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr int MAXSECTOR
Definition Sector.h:44
void init(framework::InitContext &ctx)
Task(std::vector< int > laneConfig, std::vector< int > tpcsectors, bool mctruth)
void run(framework::ProcessingContext &pc)
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< ConfigParamSpec > Options
header::DataHeader::SubSpecificationType SubSpecificationType
O2 data header classes and API, v0.1.
Definition DetID.h:49
DataProcessorSpec getSpec(std::vector< int > const &laneConfiguration, std::vector< int > const &tpcsectors, bool mctruth, bool publish=true)
void mergeHelper(const char *brprefix, std::vector< int > const &tpcsectors, uint64_t activeSectors, TFile &originfile, framework::ProcessingContext &pc)
auto makePublishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void publishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, T *accum)
o2::framework::DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
void publishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, MCTruthContainer *accum)
void publishMergedTimeframes(std::vector< int > const &lanes, std::vector< int > const &tpcsectors, bool domctruth, framework::ProcessingContext &pc)
auto makePublishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void copyHelper< MCTruthContainer >(MCTruthContainer const &origin, MCTruthContainer &target)
void copyHelper(T const &origin, R &target)
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"