Project
Loading...
Searching...
No Matches
gpu-reco-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
27#include "TPCBase/Sector.h"
32
33#include <unordered_map>
34#include <numeric>
35
36using namespace o2::framework;
37using namespace o2::dataformats;
38using namespace o2::gpu;
39using CompletionPolicyData = std::vector<InputSpec>;
41static constexpr uint64_t gTpcSectorMask = 0xFFFFFFFFF;
42static std::function<bool(o2::framework::DataProcessingHeader::StartTime)>* gPolicyOrderCheck;
43static std::shared_ptr<GPURecoWorkflowSpec> gTask;
44
45void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
46{
48}
49
50void customize(std::vector<ConfigParamSpec>& workflowOptions)
51{
52
53 std::vector<ConfigParamSpec> options{
54 {"input-type", VariantType::String, "digits", {"digitizer, digits, zsraw, zsonthefly, clustersnative, compressed-clusters-root, compressed-clusters-ctf, trd-tracklets, its-clusters"}},
55 {"output-type", VariantType::String, "tracks", {"clustersnative, tracks, compressed-clusters-ctf, qa, no-shared-cluster-map, send-clusters-per-sector, trd-tracks, error-qa, tpc-triggers, its-tracks"}},
56 {"corrmap-lumi-mode", VariantType::Int, 0, {"scaling mode: (default) 0 = static + scale * full; 1 = full + scale * derivative"}},
57 {"disable-root-input", VariantType::Bool, true, {"disable root-files input reader"}},
58 {"disable-mc", VariantType::Bool, false, {"disable sending of MC information"}},
59 {"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}},
60 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCHwClusterer.peakChargeThreshold=4;...')"}},
61 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
62 {"enableDoublePipeline", VariantType::Bool, false, {"enable GPU double pipeline mode"}},
63 {"tpc-deadMap-sources", VariantType::Int, -1, {"Sources to consider for TPC dead channel map creation; -1=all, 0=deactivated"}},
64 {"tpc-mc-time-gain", VariantType::Bool, false, {"use time gain calibration for MC (true) or for data (false)"}},
65 };
68 std::swap(workflowOptions, options);
69}
70
71// customize dispatch policy, dispatch immediately what is ready
72void customize(std::vector<DispatchPolicy>& policies)
73{
74 using DispatchOp = DispatchPolicy::DispatchOp;
75 policies.push_back({"prompt-for-gpu-reco", [](auto const& spec) { return true; }, DispatchOp::WhenReady});
76}
77
78void customize(std::vector<CompletionPolicy>& policies)
79{
80 policies.push_back(o2::tpc::TPCSectorCompletionPolicy("gpu-reconstruction.*", o2::tpc::TPCSectorCompletionPolicy::Config::RequireAll, &gPolicyData, &gTpcSectorMask, &gPolicyOrderCheck)());
81}
82
84{
85 hook = [](const char* idstring) {
86 if (gTask) {
87 gTask->deinitialize();
88 }
89 };
90}
91
92#include "Framework/runDataProcessing.h" // the main driver
93
94using namespace o2::framework;
95
113
114static const std::unordered_map<std::string, ioType> InputMap{
115 {"digits", ioType::Digits},
116 {"clusters", ioType::Clusters},
117 {"zsraw", ioType::ZSRaw},
118 {"zsonthefly", ioType::ZSRawOTF},
119 {"compressed-clusters-root", ioType::CompClustROOT},
120 {"compressed-clusters-ctf", ioType::CompClustCTF},
121 {"trd-tracklets", ioType::TRDTracklets},
122 {"its-clusters", ioType::ITSClusters},
123 {"its-mean-vertex", ioType::MeanVertex},
124};
125
126static const std::unordered_map<std::string, ioType> OutputMap{
127 {"clusters", ioType::Clusters},
128 {"tracks", ioType::Tracks},
129 {"compressed-clusters-ctf", ioType::CompClustCTF},
130 {"qa", ioType::QA},
131 {"error-qa", ioType::ErrorQA},
132 {"no-shared-cluster-map", ioType::NoSharedMap},
133 {"send-clusters-per-sector", ioType::SendClustersPerSector},
134 {"trd-tracks", ioType::TRDTracks},
135 {"its-tracks", ioType::ITSTracks},
136 {"tpc-triggers", ioType::TPCTriggers}};
137
139{
140 WorkflowSpec specs;
141 std::vector<int32_t> tpcSectors(o2::tpc::Sector::MAXSECTOR);
142 std::iota(tpcSectors.begin(), tpcSectors.end(), 0);
143
144 auto inputType = cfgc.options().get<std::string>("input-type");
145 bool doMC = !cfgc.options().get<bool>("disable-mc");
147 o2::conf::ConfigurableParam::updateFromFile(cfgc.options().get<std::string>("configFile"));
148 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
149 o2::conf::ConfigurableParam::writeINI("o2gpurecoworkflow_configuration.ini");
150
151 std::vector<ioType> outputTypes, inputTypes;
152 try {
153 outputTypes = o2::RangeTokenizer::tokenize<ioType>(cfgc.options().get<std::string>("output-type"), [](std::string const& token) { return OutputMap.at(token); });
154 inputTypes = o2::RangeTokenizer::tokenize<ioType>(cfgc.options().get<std::string>("input-type"), [](std::string const& token) { return InputMap.at(token); });
155 } catch (std::out_of_range&) {
156 throw std::invalid_argument("invalid input / output type");
157 }
158
159 auto isEnabled = [](auto& list, ioType type) {
160 return std::find(list.begin(), list.end(), type) != list.end();
161 };
162
164 cfg.runTPCTracking = true;
165 cfg.lumiScaleType = sclOpt.lumiType;
166 cfg.lumiScaleMode = sclOpt.lumiMode;
167 cfg.enableMShape = sclOpt.enableMShapeCorrection;
168 cfg.enableCTPLumi = sclOpt.requestCTPLumi;
169 cfg.decompressTPC = isEnabled(inputTypes, ioType::CompClustCTF);
170 cfg.decompressTPCFromROOT = isEnabled(inputTypes, ioType::CompClustROOT);
171 cfg.zsDecoder = isEnabled(inputTypes, ioType::ZSRaw);
172 cfg.zsOnTheFly = isEnabled(inputTypes, ioType::ZSRawOTF);
173 cfg.caClusterer = cfg.zsDecoder || cfg.zsOnTheFly || isEnabled(inputTypes, ioType::Digits);
174 cfg.outputTracks = isEnabled(outputTypes, ioType::Tracks);
175 cfg.outputCompClusters = isEnabled(outputTypes, ioType::CompClustROOT);
176 cfg.outputCompClustersFlat = isEnabled(outputTypes, ioType::CompClustCTF);
177 cfg.outputCAClusters = isEnabled(outputTypes, ioType::Clusters);
178 cfg.outputQA = isEnabled(outputTypes, ioType::QA);
179 cfg.outputErrorQA = isEnabled(outputTypes, ioType::ErrorQA);
180 cfg.outputSharedClusterMap = (cfg.outputCAClusters || cfg.caClusterer || isEnabled(inputTypes, ioType::Clusters)) && cfg.outputTracks && !isEnabled(outputTypes, ioType::NoSharedMap);
181 cfg.processMC = doMC;
182 cfg.sendClustersPerSector = isEnabled(outputTypes, ioType::SendClustersPerSector);
183 cfg.askDISTSTF = !cfgc.options().get<bool>("ignore-dist-stf");
184 cfg.readTRDtracklets = isEnabled(inputTypes, ioType::TRDTracklets);
185 cfg.runTRDTracking = isEnabled(outputTypes, ioType::TRDTracks);
186 cfg.tpcTriggerHandling = isEnabled(outputTypes, ioType::TPCTriggers) || cfg.caClusterer;
187 cfg.enableDoublePipeline = cfgc.options().get<bool>("enableDoublePipeline");
188 cfg.tpcDeadMapSources = cfgc.options().get<int32_t>("tpc-deadMap-sources");
189 cfg.tpcUseMCTimeGain = cfgc.options().get<bool>("tpc-mc-time-gain");
190 cfg.runITSTracking = isEnabled(outputTypes, ioType::ITSTracks);
191 cfg.itsOverrBeamEst = isEnabled(inputTypes, ioType::MeanVertex);
192
193 Inputs ggInputs;
194 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(false, true, false, true, true, o2::base::GRPGeomRequest::Aligned, ggInputs, true);
195
196 auto task = std::make_shared<GPURecoWorkflowSpec>(&gPolicyData, cfg, tpcSectors, gTpcSectorMask, ggRequest, &gPolicyOrderCheck);
197 Inputs taskInputs = task->inputs();
198 std::move(ggInputs.begin(), ggInputs.end(), std::back_inserter(taskInputs));
199 gTask = task;
200
201 specs.emplace_back(DataProcessorSpec{
202 "gpu-reconstruction",
203 taskInputs,
204 task->outputs(),
205 AlgorithmSpec{adoptTask<GPURecoWorkflowSpec>(task)},
206 task->options()});
207
208 if (cfg.enableDoublePipeline) {
209 cfg.enableDoublePipeline = 2;
210 Inputs ggInputsPrepare;
211 auto ggRequestPrepare = std::make_shared<o2::base::GRPGeomRequest>(false, true, false, false, false, o2::base::GRPGeomRequest::None, ggInputsPrepare, true);
212 auto taskPrepare = std::make_shared<GPURecoWorkflowSpec>(&gPolicyData, cfg, tpcSectors, gTpcSectorMask, ggRequestPrepare);
213 Inputs taskInputsPrepare = taskPrepare->inputs();
214 std::move(ggInputsPrepare.begin(), ggInputsPrepare.end(), std::back_inserter(taskInputsPrepare));
215 specs.emplace_back(DataProcessorSpec{
216 "gpu-reconstruction-prepare",
217 taskInputsPrepare,
218 taskPrepare->outputs(),
219 AlgorithmSpec{adoptTask<GPURecoWorkflowSpec>(taskPrepare)},
220 taskPrepare->options()});
221 }
222
223 if (!cfgc.options().get<bool>("ignore-dist-stf")) {
226 o2::globaltracking::InputHelper::addInputSpecs(cfgc, specs, srcCl, srcTrk, srcTrk, doMC);
227 }
228
229 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
230 o2::raw::HBFUtilsInitializer hbfIni(cfgc, specs);
231
232 return specs;
233}
Helper class to access load maps from CCDB.
Helper for geometry and GRP related CCDB requests.
Global index for barrel track: provides provenance (detectors combination), index in respective array...
std::vector< InputSpec > CompletionPolicyData
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
static mask_t getSourcesMask(const std::string_view srcList)
ConfigParamRegistry & options() const
static int addInputSpecs(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs, GID::mask_t maskClusters, GID::mask_t maskMatches, GID::mask_t maskTracks, bool useMC=true, GID::mask_t maskClustersMC=GID::getSourcesMask(GID::ALL), GID::mask_t maskTracksMC=GID::getSourcesMask(GID::ALL), bool subSpecStrict=false)
static void addGlobalOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static CorrectionMapsLoaderGloOpts parseGlobalOptions(const o2::framework::ConfigParamRegistry &opts)
static constexpr int MAXSECTOR
Definition Sector.h:44
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
@ TRDTracklets
@ TPCTriggers
@ CompClustROOT
@ ITSClusters
@ SendClustersPerSector
@ NoSharedMap
@ CompClustCTF
@ MeanVertex
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
const std::unordered_map< std::string, OutputType > OutputMap
const std::unordered_map< std::string, InputType > InputMap
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::function< void(const char *)> OnWorkflowTerminationHook
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
Definition list.h:40
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask