Project
Loading...
Searching...
No Matches
RecoWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
19#include "Framework/Logger.h"
29#include "TPCWorkflow/ZSSpec.h"
48#include <string>
49#include <stdexcept>
50#include <fstream>
51#include <sstream>
52#include <iomanip>
53#include <unordered_map>
54#include <stdexcept>
55#include <algorithm> // std::find
56#include <tuple> // make_tuple
57#include <array>
58#include <gsl/span>
59
60using namespace o2::dataformats;
61
62namespace o2
63{
64namespace tpc
65{
66namespace reco_workflow
67{
68
69static std::shared_ptr<o2::gpu::GPURecoWorkflowSpec> gTask;
70
71using namespace framework;
72
73template <typename T>
75
76const std::unordered_map<std::string, InputType> InputMap{
77 {"pass-through", InputType::PassThrough},
78 {"digitizer", InputType::Digitizer},
79 {"digits", InputType::Digits},
80 {"clustershardware", InputType::ClustersHardware},
81 {"clusters", InputType::Clusters},
82 {"zsraw", InputType::ZSRaw},
83 {"compressed-clusters", InputType::CompClusters},
84 {"compressed-clusters-ctf", InputType::CompClustersCTF},
85 {"compressed-clusters-flat", InputType::CompClustersFlat}};
86
87const std::unordered_map<std::string, OutputType> OutputMap{
88 {"digits", OutputType::Digits},
89 {"clustershardware", OutputType::ClustersHardware},
90 {"clusters", OutputType::Clusters},
91 {"tracks", OutputType::Tracks},
92 {"compressed-clusters", OutputType::CompClusters},
93 {"encoded-clusters", OutputType::EncodedClusters},
94 {"disable-writer", OutputType::DisableWriter},
95 {"send-clusters-per-sector", OutputType::SendClustersPerSector},
96 {"zsraw", OutputType::ZSRaw},
97 {"qa", OutputType::QA},
98 {"no-shared-cluster-map", OutputType::NoSharedClusterMap},
99 {"tpc-triggers", OutputType::TPCTriggers}};
100
101framework::WorkflowSpec getWorkflow(CompletionPolicyData* policyData, std::vector<int> const& tpcSectors, unsigned long tpcSectorMask, std::vector<int> const& laneConfiguration,
102 const o2::tpc::CorrectionMapsLoaderGloOpts& sclOpts, bool propagateMC, unsigned nLanes, std::string const& cfgInput, std::string const& cfgOutput, bool disableRootInput,
103 int caClusterer, int zsOnTheFly, bool askDISTSTF, bool selIR, bool filteredInp, int deadMapSources, bool useMCTimeGain)
104{
105 InputType inputType;
106 try {
107 inputType = InputMap.at(cfgInput);
108 } catch (std::out_of_range&) {
109 throw std::invalid_argument(std::string("invalid input type: ") + cfgInput);
110 }
111 std::vector<OutputType> outputTypes;
112 try {
113 outputTypes = RangeTokenizer::tokenize<OutputType>(cfgOutput, [](std::string const& token) { return OutputMap.at(token); });
114 } catch (std::out_of_range&) {
115 throw std::invalid_argument(std::string("invalid output type: ") + cfgOutput);
116 }
117 auto isEnabled = [&outputTypes](OutputType type) {
118 return std::find(outputTypes.begin(), outputTypes.end(), type) != outputTypes.end();
119 };
120
121 if (filteredInp && !(inputType == InputType::PassThrough && isEnabled(OutputType::Tracks) && isEnabled(OutputType::Clusters) && isEnabled(OutputType::SendClustersPerSector))) {
122 throw std::invalid_argument("filtered-input option must be provided only with pass-through input and clusters,tracks,send-clusters-per-sector output");
123 }
124
125 bool decompressTPC = inputType == InputType::CompClustersCTF || inputType == InputType::CompClusters;
126 // Disable not applicable settings depending on TPC input, no need to disable manually
127 if (decompressTPC && (isEnabled(OutputType::Clusters) || isEnabled(OutputType::Tracks))) {
128 caClusterer = false;
129 zsOnTheFly = false;
130 propagateMC = false;
131 }
132 if (inputType == InputType::ZSRaw || inputType == InputType::CompClustersFlat) {
133 caClusterer = true;
134 zsOnTheFly = false;
135 propagateMC = false;
136 }
137 if (inputType == InputType::PassThrough || inputType == InputType::ClustersHardware || inputType == InputType::Clusters) {
138 caClusterer = false;
139 }
140 if (!caClusterer) {
141 zsOnTheFly = false;
142 }
143
144 if (inputType == InputType::ClustersHardware && isEnabled(OutputType::Digits)) {
145 throw std::invalid_argument("input/output type mismatch, can not produce 'digits' from 'clustershardware'");
146 }
147 if (inputType == InputType::Clusters && (isEnabled(OutputType::Digits) || isEnabled(OutputType::ClustersHardware))) {
148 throw std::invalid_argument("input/output type mismatch, can not produce 'digits', nor 'clustershardware' from 'clusters'");
149 }
150 if (inputType == InputType::ZSRaw && isEnabled(OutputType::ClustersHardware)) {
151 throw std::invalid_argument("input/output type mismatch, can not produce 'clustershardware' from 'zsraw'");
152 }
153 if (caClusterer && (inputType == InputType::Clusters || inputType == InputType::ClustersHardware)) {
154 throw std::invalid_argument("ca-clusterer requires digits as input");
155 }
156 if (caClusterer && (isEnabled(OutputType::ClustersHardware))) {
157 throw std::invalid_argument("ca-clusterer cannot produce clustershardware output");
158 }
159
160 WorkflowSpec specs;
161
162 bool produceTracks = isEnabled(OutputType::Tracks);
163
164 // We provide a special publishing method for labels which have been stored in a split format and need
165 // to be transformed into a contiguous shareable container before publishing. For other branches/types this returns
166 // false and the generic RootTreeWriter publishing proceeds
167 static Reader::SpecialPublishHook hook{[](std::string_view name, ProcessingContext& context, o2::framework::Output const& output, char* data) -> bool {
168 if (TString(name.data()).Contains("TPCDigitMCTruth") || TString(name.data()).Contains("TPCClusterHwMCTruth") || TString(name.data()).Contains("TPCClusterNativeMCTruth")) {
169 auto storedlabels = reinterpret_cast<o2::dataformats::IOMCTruthContainerView const*>(data);
171 storedlabels->copyandflatten(flatlabels);
172 // LOG(info) << "PUBLISHING CONST LABELS " << flatlabels.getNElements();
173 context.outputs().snapshot(output, flatlabels);
174 return true;
175 }
176 return false;
177 }};
178 if (!disableRootInput || inputType == InputType::PassThrough) {
179 // The OutputSpec of the PublisherSpec is configured depending on the input
180 // type. Note that the configuration of the dispatch trigger in the main file
181 // needs to be done in accordance. This means, if a new input option is added
182 // also the dispatch trigger needs to be updated.
183 if (inputType == InputType::Digits) {
184 using Type = std::vector<o2::tpc::Digit>;
185
186 specs.emplace_back(o2::tpc::getPublisherSpec<Type>(PublisherConf{
187 "tpc-digit-reader",
188 "tpcdigits.root",
189 "o2sim",
190 {"digitbranch", "TPCDigit", "Digit branch"},
191 {"mcbranch", "TPCDigitMCTruth", "MC label branch"},
192 OutputSpec{"TPC", "DIGITS"},
193 OutputSpec{"TPC", "DIGITSMCTR"},
194 tpcSectors,
195 laneConfiguration,
196 &hook},
197 propagateMC));
198 if (sclOpts.needTPCScalersWorkflow()) { // for standalone tpc-reco workflow
199 specs.emplace_back(o2::tpc::getTPCScalerSpec(sclOpts.lumiType == 2, sclOpts.enableMShapeCorrection));
200 }
201 if (produceTracks && sclOpts.requestCTPLumi) { // need CTP digits (lumi) reader
202 specs.emplace_back(o2::ctp::getDigitsReaderSpec(false));
203 }
204 } else if (inputType == InputType::ClustersHardware) {
205 specs.emplace_back(o2::tpc::getPublisherSpec(PublisherConf{
206 "tpc-clusterhardware-reader",
207 "tpc-clusterhardware.root",
208 "tpcclustershardware",
209 {"databranch", "TPCClusterHw", "Branch with TPC ClustersHardware"},
210 {"mcbranch", "TPCClusterHwMCTruth", "MC label branch"},
211 OutputSpec{"TPC", "CLUSTERHW"},
212 OutputSpec{"TPC", "CLUSTERHWMCLBL"},
213 tpcSectors,
214 laneConfiguration,
215 &hook},
216 propagateMC));
217 } else if (inputType == InputType::Clusters) {
218 specs.emplace_back(o2::tpc::getClusterReaderSpec(propagateMC, &tpcSectors, &laneConfiguration));
219 if (!getenv("DPL_DISABLE_TPC_TRIGGER_READER") || atoi(getenv("DPL_DISABLE_TPC_TRIGGER_READER")) != 1) {
220 specs.emplace_back(o2::tpc::getTPCTriggerReaderSpec());
221 }
222 if (sclOpts.needTPCScalersWorkflow()) { // for standalone tpc-reco workflow
223 specs.emplace_back(o2::tpc::getTPCScalerSpec(sclOpts.lumiType == 2, sclOpts.enableMShapeCorrection));
224 }
225 if (sclOpts.requestCTPLumi) { // need CTP digits (lumi) reader
226 specs.emplace_back(o2::ctp::getDigitsReaderSpec(false));
227 }
228 } else if (inputType == InputType::CompClusters) {
229 // TODO: need to check if we want to store the MC labels alongside with compressed clusters
230 // for the moment reading of labels is disabled (last parameter is false)
231 // TODO: make a different publisher spec for only one output spec, for now using the
232 // PublisherSpec with only sector 0, '_0' is thus appended to the branch name
233 specs.emplace_back(o2::tpc::getPublisherSpec(PublisherConf{
234 "tpc-compressed-cluster-reader",
235 "tpc-compclusters.root",
236 "tpcrec",
237 {"clusterbranch", "TPCCompClusters", "Branch with TPC compressed clusters"},
238 {"", "", ""}, // No MC labels
239 OutputSpec{"TPC", "COMPCLUSTERS"},
240 OutputSpec{"", ""}, // No MC labels
241 std::vector<int>(1, 0),
242 std::vector<int>(1, 0),
243 &hook},
244 false));
245 }
246 }
247
248 // output matrix
249 // Note: the ClusterHardware format is probably a deprecated legacy format and also the
250 // ClusterDecoderRawSpec
251 bool produceCompClusters = isEnabled(OutputType::CompClusters);
252 bool runGPUReco = (produceTracks || produceCompClusters || (isEnabled(OutputType::Clusters) && caClusterer) || inputType == InputType::CompClustersCTF) && inputType != InputType::CompClustersFlat;
253 bool runHWDecoder = !caClusterer && (runGPUReco || isEnabled(OutputType::Clusters));
254 bool runClusterer = !caClusterer && (runHWDecoder || isEnabled(OutputType::ClustersHardware));
255 bool zsDecoder = inputType == InputType::ZSRaw;
256 bool runClusterEncoder = isEnabled(OutputType::EncodedClusters);
257
258 // input matrix
259 runClusterer &= inputType == InputType::Digitizer || inputType == InputType::Digits;
260 runHWDecoder &= runClusterer || inputType == InputType::ClustersHardware;
261 runGPUReco &= caClusterer || runHWDecoder || inputType == InputType::Clusters || decompressTPC;
262
263 bool outRaw = inputType == InputType::Digits && isEnabled(OutputType::ZSRaw) && !isEnabled(OutputType::DisableWriter);
264 // bool runZSDecode = inputType == InputType::ZSRaw;
265 bool zsToDigit = inputType == InputType::ZSRaw && isEnabled(OutputType::Digits);
266
267 if (inputType == InputType::PassThrough) {
268 runGPUReco = runHWDecoder = runClusterer = runClusterEncoder = zsToDigit = false;
269 }
270
271 WorkflowSpec parallelProcessors;
273 //
274 // clusterer process(es)
275 //
276 //
277 if (runClusterer) {
278 parallelProcessors.push_back(o2::tpc::getClustererSpec(propagateMC));
279 }
280
282 //
283 // cluster decoder process(es)
284 //
285 //
286 if (runHWDecoder) {
287 parallelProcessors.push_back(o2::tpc::getClusterDecoderRawSpec(propagateMC));
288 }
289
291 //
292 // set up parallel TPC lanes
293 //
294 // the parallelPipeline helper distributes the subspec ids from the lane configuration
295 // among the pipelines. All inputs and outputs of processors of one pipeline will be
296 // cloned by the number of subspecs served by this pipeline and amended with the subspecs
297 parallelProcessors = parallelPipeline(
298 parallelProcessors, nLanes,
299 [&laneConfiguration]() { return laneConfiguration.size(); },
300 [&laneConfiguration](size_t index) { return laneConfiguration[index]; });
301 specs.insert(specs.end(), parallelProcessors.begin(), parallelProcessors.end());
302
304 //
305 // generation of processor specs for various types of outputs
306 // based on generic RootTreeWriter and MakeRootTreeWriterSpec generator
307 //
308 // -------------------------------------------------------------------------------------------
309 // the callbacks for the RootTreeWriter
310 //
311 // The generic writer needs a way to associate incoming data with the individual branches for
312 // the TPC sectors. The sector number is transmitted as part of the sector header, the callback
313 // finds the corresponding index in the vector of configured sectors
314 auto getIndex = [tpcSectors](o2::framework::DataRef const& ref) {
315 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
316 if (!tpcSectorHeader) {
317 throw std::runtime_error("TPC sector header missing in header stack");
318 }
319 if (tpcSectorHeader->sector() < 0) {
320 // special data sets, don't write
321 return ~(size_t)0;
322 }
323 size_t index = 0;
324 for (auto const& sector : tpcSectors) {
325 if (sector == tpcSectorHeader->sector()) {
326 return index;
327 }
328 ++index;
329 }
330 throw std::runtime_error("sector " + std::to_string(tpcSectorHeader->sector()) + " not configured for writing");
331 };
332 auto getName = [tpcSectors](std::string base, size_t index) {
333 return base + "_" + std::to_string(tpcSectors.at(index));
334 };
335
336 // -------------------------------------------------------------------------------------------
337 // helper to create writer specs for different types of output
338 auto fillLabels = [](TBranch& branch, std::vector<char> const& labelbuffer, DataRef const& /*ref*/) {
341 auto ptr = &outputcontainer;
343 outputcontainer.adopt(labelbuffer);
344 br->Fill();
345 br->ResetAddress();
346 };
347
348 auto makeWriterSpec = [tpcSectors, laneConfiguration, propagateMC, getIndex, getName](const char* processName,
349 const char* defaultFileName,
350 const char* defaultTreeName,
351 auto&& databranch,
352 auto&& mcbranch,
353 bool singleBranch = false) {
354 if (tpcSectors.size() == 0) {
355 throw std::invalid_argument(std::string("writer process configuration needs list of TPC sectors"));
356 }
357
358 auto amendInput = [tpcSectors, laneConfiguration](InputSpec& input, size_t index) {
359 input.binding += std::to_string(laneConfiguration[index]);
360 DataSpecUtils::updateMatchingSubspec(input, laneConfiguration[index]);
361 };
362 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex, getName, singleBranch](auto&& def, bool enableMC = true) {
363 if (!singleBranch) {
364 def.keys = mergeInputs(def.keys, laneConfiguration.size(), amendInput);
365 // the branch is disabled if set to 0
366 def.nofBranches = enableMC ? tpcSectors.size() : 0;
367 def.getIndex = getIndex;
368 def.getName = getName;
369 } else {
370 // instead of the separate sector branches only one is going to be written
371 def.nofBranches = enableMC ? 1 : 0;
372 }
373 return std::move(def);
374 };
375
376 return std::move(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
377 std::move(amendBranchDef(databranch)),
378 std::move(amendBranchDef(mcbranch, propagateMC)))());
379 };
380
382 //
383 // a writer process for digits
384 //
385 // selected by output type 'difits'
386 if (isEnabled(OutputType::Digits) && !isEnabled(OutputType::DisableWriter)) {
387 using DigitOutputType = std::vector<o2::tpc::Digit>;
388 specs.push_back(makeWriterSpec("tpc-digits-writer",
389 inputType == InputType::ZSRaw ? "tpc-zs-digits.root" : inputType == InputType::Digits ? "tpc-filtered-digits.root"
390 : "tpcdigits.root",
391 "o2sim",
392 BranchDefinition<DigitOutputType>{InputSpec{"data", "TPC", "DIGITS", 0},
393 "TPCDigit",
394 "digit-branch-name"},
395 BranchDefinition<MCLabelContainer>{InputSpec{"mc", "TPC", "DIGITSMCTR", 0},
396 "TPCDigitMCTruth",
397 "digitmc-branch-name"}));
398 }
399
401 //
402 // a writer process for hardware clusters
403 //
404 // selected by output type 'clustershardware'
405 if (isEnabled(OutputType::ClustersHardware) && !isEnabled(OutputType::DisableWriter)) {
406 specs.push_back(makeWriterSpec("tpc-clusterhardware-writer",
407 inputType == InputType::ClustersHardware ? "tpc-filtered-clustershardware.root" : "tpc-clustershardware.root",
408 "tpcclustershardware",
409 BranchDefinition<const char*>{InputSpec{"data", "TPC", "CLUSTERHW", 0},
410 "TPCClusterHw",
411 "databranch"},
412 BranchDefinition<MCLabelContainer>{InputSpec{"mc", "TPC", "CLUSTERHWMCLBL", 0},
413 "TPCClusterHwMCTruth",
414 "mcbranch"}));
415 }
416
418 //
419 // a writer process for TPC native clusters
420 //
421 // selected by output type 'clusters'
422 if (isEnabled(OutputType::Clusters) && !isEnabled(OutputType::DisableWriter)) {
423 // if the caClusterer is enabled, only one data set with the full TPC is produced, and the writer
424 // is configured to write one single branch
425 specs.push_back(makeWriterSpec(filteredInp ? "tpc-native-cluster-writer_filtered" : "tpc-native-cluster-writer",
426 (inputType == InputType::Clusters || filteredInp) ? "tpc-filtered-native-clusters.root" : "tpc-native-clusters.root",
427 "tpcrec",
428 BranchDefinition<const char*>{InputSpec{"data", ConcreteDataTypeMatcher{"TPC", filteredInp ? o2::header::DataDescription("CLUSTERNATIVEF") : o2::header::DataDescription("CLUSTERNATIVE")}},
429 "TPCClusterNative",
430 "databranch"},
432 "TPCClusterNativeMCTruth",
433 "mcbranch", fillLabels},
434 (caClusterer || decompressTPC || inputType == InputType::PassThrough) && !isEnabled(OutputType::SendClustersPerSector)));
435 }
436
437 if ((isEnabled(OutputType::TPCTriggers) || caClusterer) && !isEnabled(OutputType::DisableWriter)) {
438 specs.push_back(o2::tpc::getTPCTriggerWriterSpec());
439 }
440
441 if (zsOnTheFly) {
442 specs.emplace_back(o2::tpc::getZSEncoderSpec(tpcSectors, outRaw, tpcSectorMask));
443 }
444
445 if (zsToDigit) {
446 specs.emplace_back(o2::tpc::getZStoDigitsSpec(tpcSectors));
447 }
448
450 //
451 // tracker process
452 //
453 // selected by output type 'tracks'
454 if (runGPUReco) {
456 cfg.runTPCTracking = true;
457 cfg.lumiScaleType = sclOpts.lumiType;
458 cfg.lumiScaleMode = sclOpts.lumiMode;
460 cfg.enableCTPLumi = sclOpts.requestCTPLumi;
461 cfg.decompressTPC = decompressTPC;
462 cfg.decompressTPCFromROOT = decompressTPC && inputType == InputType::CompClusters;
463 cfg.caClusterer = caClusterer;
464 cfg.zsDecoder = zsDecoder;
465 cfg.zsOnTheFly = zsOnTheFly;
466 cfg.outputTracks = produceTracks;
467 cfg.outputCompClusters = produceCompClusters;
468 cfg.outputCompClustersFlat = runClusterEncoder;
469 cfg.outputCAClusters = isEnabled(OutputType::Clusters) && (caClusterer || decompressTPC);
470 cfg.outputQA = isEnabled(OutputType::QA);
471 cfg.outputSharedClusterMap = (isEnabled(OutputType::Clusters) || inputType == InputType::Clusters) && isEnabled(OutputType::Tracks) && !isEnabled(OutputType::NoSharedClusterMap);
472 cfg.processMC = propagateMC;
474 cfg.askDISTSTF = askDISTSTF;
476 cfg.tpcDeadMapSources = deadMapSources;
477 cfg.tpcUseMCTimeGain = useMCTimeGain;
478
479 Inputs ggInputs;
480 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(false, true, false, true, true, o2::base::GRPGeomRequest::Aligned, ggInputs, true);
481
482 auto task = std::make_shared<o2::gpu::GPURecoWorkflowSpec>(policyData, cfg, tpcSectors, tpcSectorMask, ggRequest);
483 gTask = task;
484 Inputs taskInputs = task->inputs();
485 Options taskOptions = task->options();
486 std::move(ggInputs.begin(), ggInputs.end(), std::back_inserter(taskInputs));
487
488 specs.emplace_back(DataProcessorSpec{
489 "tpc-tracker",
490 taskInputs,
491 task->outputs(),
492 AlgorithmSpec{adoptTask<o2::gpu::GPURecoWorkflowSpec>(task)},
493 taskOptions});
494 }
495
497 //
498 // tracker process
499 //
500 // selected by output type 'encoded-clusters'
501 if (runClusterEncoder) {
502 specs.emplace_back(o2::tpc::getEntropyEncoderSpec(!runGPUReco && inputType != InputType::CompClustersFlat, selIR));
503 }
504
506 //
507 // a writer process for tracks
508 //
509 // selected by output type 'tracks'
510 if (produceTracks && !isEnabled(OutputType::DisableWriter)) {
511 // defining the track writer process using the generic RootTreeWriter and generator tool
512 //
513 // defaults
514 const char* processName = filteredInp ? "tpc-track-writer_filtered" : "tpc-track-writer";
515 const char* defaultFileName = filteredInp ? "tpctracks_filtered.root" : "tpctracks.root";
516 const char* defaultTreeName = "tpcrec";
517
518 // branch definitions for RootTreeWriter spec
519 using TrackOutputType = std::vector<o2::tpc::TrackTPC>;
520
521 using ClusRefsOutputType = std::vector<o2::tpc::TPCClRefElem>;
522 // a spectator callback which will be invoked by the tree writer with the extracted object
523 // we are using it for printing a log message
524 auto logger = BranchDefinition<TrackOutputType>::Spectator([](TrackOutputType const& tracks) {
525 LOG(info) << "writing " << tracks.size() << " track(s)";
526 });
527 auto tracksdef = BranchDefinition<TrackOutputType>{InputSpec{"inputTracks", "TPC", filteredInp ? o2::header::DataDescription("TRACKSF") : o2::header::DataDescription("TRACKS"), 0}, //
528 "TPCTracks", "track-branch-name", //
529 1, //
530 logger}; //
531 auto clrefdef = BranchDefinition<ClusRefsOutputType>{InputSpec{"inputClusRef", "TPC", filteredInp ? o2::header::DataDescription("CLUSREFSF") : o2::header::DataDescription("CLUSREFS"), 0}, //
532 "ClusRefs", "trackclusref-branch-name"}; //
533 auto mcdef = BranchDefinition<std::vector<o2::MCCompLabel>>{InputSpec{"mcinput", "TPC", filteredInp ? o2::header::DataDescription("TRACKSMCLBLF") : o2::header::DataDescription("TRACKSMCLBL"), 0}, //
534 "TPCTracksMCTruth", //
535 (propagateMC ? 1 : 0), //
536 "trackmc-branch-name"}; //
537
538 // depending on the MC propagation flag, branch definition for MC labels is disabled
539 specs.push_back(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName,
540 std::move(tracksdef), std::move(clrefdef),
541 std::move(mcdef))());
542 }
543
545 //
546 // a writer process for compressed clusters container
547 //
548 // selected by output type 'compressed-clusters'
549 if (produceCompClusters && !isEnabled(OutputType::DisableWriter)) {
550 // defining the track writer process using the generic RootTreeWriter and generator tool
551 //
552 // defaults
553 const char* processName = "tpc-compcluster-writer";
554 const char* defaultFileName = "tpc-compclusters.root";
555 const char* defaultTreeName = "tpcrec";
556
557 // branch definitions for RootTreeWriter spec
558 using CCluSerializedType = ROOTSerialized<CompressedClustersROOT>;
559 auto ccldef = BranchDefinition<CCluSerializedType>{InputSpec{"inputCompCl", "TPC", "COMPCLUSTERS"}, //
560 "TPCCompClusters_0", "compcluster-branch-name"}; //
561
562 specs.push_back(MakeRootTreeWriterSpec(processName, defaultFileName, defaultTreeName, //
563 std::move(ccldef))()); //
564 }
565
566 return std::move(specs);
567}
568
570{
571 if (gTask) {
572 gTask->deinitialize();
573 }
574}
575
576} // end namespace reco_workflow
577} // end namespace tpc
578} // end namespace o2
Processor spec for decoder of TPC raw cluster data.
Meta data for a group describing it by sector number and global padrow.
Container to store compressed TPC cluster data.
std::string getName(const TDataMember *dm, int index, int size)
A const (ready only) version of MCTruthContainer.
Helper class to access load maps from CCDB.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Helper for geometry and GRP related CCDB requests.
A special IO container - splitting a given vector to enable ROOT IO.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Configurable generator for RootTreeWriter processor spec.
Helper function to tokenize sequences and ranges of integral numbers.
spec definition for a TPC clusterer process
ProcessorSpec for the TPC cluster entropy encoding.
Workflow definition for the TPC reconstruction.
TBranch * ptr
Definitions of TPC Zero Suppression Data Headers.
A read-only version of MCTruthContainer allowing for storage optimisation.
void adopt(gsl::span< const char > const input)
"adopt" (without taking ownership) from an existing buffer
Generate a processor spec for the RootTreeWriter utility.
static TBranch * remapBranch(TBranch &branchRef, T **newdata)
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLint ref
Definition glcorearb.h:291
framework::DataProcessorSpec getDigitsReaderSpec(bool propagateMC=true, const std::string &defFile="ctpdigits.root")
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
std::vector< framework::InputSpec > CompletionPolicyData
InputType
define input and output types of the workflow
const std::unordered_map< std::string, InputType > InputMap
framework::WorkflowSpec getWorkflow(CompletionPolicyData *policyData, std::vector< int > const &tpcSectors, unsigned long tpcSectorMask, std::vector< int > const &laneConfiguration, const o2::tpc::CorrectionMapsLoaderGloOpts &sclOpts, bool propagateMC=true, unsigned nLanes=1, std::string const &cfgInput="digitizer", std::string const &cfgOutput="tracks", bool disableRootInput=false, int caClusterer=0, int zsOnTheFly=0, bool askDISTSTF=true, bool selIR=false, bool filteredInp=false, int deadMapSources=-1, bool useMCTimeGain=false)
create the workflow for TPC reconstruction
const std::unordered_map< std::string, OutputType > OutputMap
framework::DataProcessorSpec getEntropyEncoderSpec(bool inputFromFile, bool selIR=false)
create a processor spec
framework::DataProcessorSpec getClusterReaderSpec(bool useMC, const std::vector< int > *tpcSectors=nullptr, const std::vector< int > *laneConfiguration=nullptr)
o2::framework::DataProcessorSpec getTPCScalerSpec(bool enableIDCs, bool enableMShape)
framework::DataProcessorSpec getZSEncoderSpec(std::vector< int > const &tpcSectors, bool outRaw, unsigned long tpcSectorMask)
create a processor spec
Definition ZSSpec.cxx:61
o2::framework::DataProcessorSpec getTPCTriggerWriterSpec()
framework::DataProcessorSpec getZStoDigitsSpec(std::vector< int > const &tpcSectors)
Definition ZSSpec.cxx:211
framework::DataProcessorSpec getClustererSpec(bool sendMC)
framework::DataProcessorSpec getPublisherSpec(PublisherConf const &config, bool propagateMC=true)
framework::DataProcessorSpec getTPCTriggerReaderSpec()
framework::DataProcessorSpec getClusterDecoderRawSpec(bool sendMC=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
int lumiType
what estimator to used for corrections scaling: 0: no scaling, 1: CTP, 2: IDC
int lumiMode
what corrections method to use: 0: classical scaling, 1: Using of the derivative map,...
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"