Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
42#include "TPCFastTransform.h"
51#include "TPCBase/RDHUtils.h"
53#include "GPUO2InterfaceQA.h"
54#include "GPUO2Interface.h"
55#include "GPUO2InterfaceUtils.h"
56#include "CalibdEdxContainer.h"
57#include "GPUNewCalibValues.h"
58#include "TPCPadGainCalib.h"
59#include "TPCZSLinkMapping.h"
61#include "TPCBase/Sector.h"
62#include "TPCBase/Utils.h"
70#include "Algorithm/Parser.h"
73#include "TRDBase/Geometry.h"
80#include "GPUWorkflowInternal.h"
82// #include "Framework/ThreadPool.h"
83
84#include <TStopwatch.h>
85#include <TObjArray.h>
86#include <TH1F.h>
87#include <TH2F.h>
88#include <TH1D.h>
89#include <TGraphAsymmErrors.h>
90
91#include <filesystem>
92#include <memory>
93#include <vector>
94#include <iomanip>
95#include <stdexcept>
96#include <regex>
97#include <sys/types.h>
98#include <sys/stat.h>
99#include <fcntl.h>
100#include <chrono>
101#include <unordered_set>
102
103using namespace o2::framework;
104using namespace o2::header;
105using namespace o2::gpu;
106using namespace o2::base;
107using namespace o2::dataformats;
109
110namespace o2::gpu
111{
112
113GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
114{
115 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC) {
116 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer is activated");
117 }
118
119 mConfig.reset(new GPUO2InterfaceConfiguration);
120 mConfParam.reset(new GPUSettingsO2);
121 mTFSettings.reset(new GPUSettingsTF);
122 mTimer.reset(new TStopwatch);
123 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
124
125 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
126 *gPolicyOrder = &mPolicyOrder;
127 }
128}
129
131
133{
135 GPUO2InterfaceConfiguration& config = *mConfig.get();
136 GPUSettingsProcessingNNclusterizer& mNNClusterizerSettings = mConfig->configProcessing.nn;
137
138 if (mNNClusterizerSettings.nnLoadFromCCDB) {
139 LOG(info) << "Loading neural networks from CCDB";
140 o2::tpc::NeuralNetworkClusterizer nnClusterizerFetcher;
141 nnClusterizerFetcher.initCcdbApi(mNNClusterizerSettings.nnCCDBURL);
142 std::map<std::string, std::string> ccdbSettings = {
143 {"nnCCDBURL", mNNClusterizerSettings.nnCCDBURL},
144 {"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
145 {"inputDType", mNNClusterizerSettings.nnInferenceInputDType},
146 {"outputDType", mNNClusterizerSettings.nnInferenceOutputDType},
147 {"outputFolder", mNNClusterizerSettings.nnLocalFolder},
148 {"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
149 {"nnCCDBWithMomentum", std::to_string(mNNClusterizerSettings.nnCCDBWithMomentum)},
150 {"nnCCDBBeamType", mNNClusterizerSettings.nnCCDBBeamType},
151 {"nnCCDBInteractionRate", std::to_string(mNNClusterizerSettings.nnCCDBInteractionRate)}};
152
153 std::string nnFetchFolder = mNNClusterizerSettings.nnLocalFolder;
154 std::vector<std::string> evalMode = o2::utils::Str::tokenize(mNNClusterizerSettings.nnEvalMode, ':');
155
156 if (evalMode[0] == "c1") {
157 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
158 ccdbSettings["nnCCDBEvalType"] = "classification_c1";
159 ccdbSettings["outputFile"] = "net_classification_c1.onnx";
160 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
161 } else if (evalMode[0] == "c2") {
162 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
163 ccdbSettings["nnCCDBEvalType"] = "classification_c2";
164 ccdbSettings["outputFile"] = "net_classification_c2.onnx";
165 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
166 }
167
168 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
169 ccdbSettings["nnCCDBEvalType"] = "regression_c1";
170 ccdbSettings["outputFile"] = "net_regression_c1.onnx";
171 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
172 if (evalMode[1] == "r2") {
173 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
174 ccdbSettings["nnCCDBEvalType"] = "regression_c2";
175 ccdbSettings["outputFile"] = "net_regression_c2.onnx";
176 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
177 }
178 LOG(info) << "Neural network loading done!";
179 }
180
181 // Create configuration object and fill settings
182 mConfig->configGRP.solenoidBzNominalGPU = 0;
183 mTFSettings->hasSimStartOrbit = 1;
184 auto& hbfu = o2::raw::HBFUtils::Instance();
185 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
186
187 *mConfParam = mConfig->ReadConfigurableParam();
188 if (mConfParam->display) {
189 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
190 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
191 if (mConfig->configProcessing.eventDisplay != nullptr) {
192 LOG(info) << "Event display enabled";
193 } else {
194 throw std::runtime_error("GPU Event Display frontend could not be created!");
195 }
196 }
197 if (mSpecConfig.enableDoublePipeline) {
198 mConfig->configProcessing.doublePipeline = 1;
199 }
200
201 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
202 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
203 if (mAutoContinuousMaxTimeBin) {
204 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
205 }
206 if (mConfig->configProcessing.deviceNum == -2) {
207 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
208 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
209 mConfig->configProcessing.deviceNum = myId;
210 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
211 }
212 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
213 mVerbosity = 1;
214 }
215 mConfig->configProcessing.runMC = mSpecConfig.processMC;
216 if (mSpecConfig.outputQA) {
217 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
218 throw std::runtime_error("Need MC information to create QA plots");
219 }
220 if (!mSpecConfig.processMC) {
221 mConfig->configQA.noMC = true;
222 }
223 mConfig->configQA.shipToQC = true;
224 if (!mConfig->configProcessing.runQA) {
225 mConfig->configQA.enableLocalOutput = false;
226 mQATaskMask = (mSpecConfig.processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
227 mConfig->configProcessing.runQA = -mQATaskMask;
228 }
229 }
230 mConfig->configReconstruction.tpc.nWaysOuter = true;
231 mConfig->configInterface.outputToExternalBuffers = true;
232 if (mConfParam->synchronousProcessing) {
233 mConfig->configReconstruction.useMatLUT = false;
234 }
235 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
236 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
237 }
238
239 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
240 if (mSpecConfig.outputTracks || mSpecConfig.outputCompClusters || mSpecConfig.outputCompClustersFlat) {
241 mConfig->configWorkflow.steps.set(GPUDataTypes::RecoStep::TPCConversion,
244 mConfig->configWorkflow.outputs.set(GPUDataTypes::InOutType::TPCMergedTracks);
245 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
246 }
247 if (mSpecConfig.outputCompClusters || mSpecConfig.outputCompClustersFlat) {
248 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCCompression, true);
249 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCCompressedClusters, true);
250 }
251 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCClusters);
252 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
253 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCRaw);
254 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCClusterFinding, true);
255 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCClusters, true);
256 }
257 if (mSpecConfig.decompressTPC) {
258 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCCompression, false);
259 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCDecompression, true);
260 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCCompressedClusters);
261 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCClusters, true);
262 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCCompressedClusters, false);
263 if (mTPCSectorMask != 0xFFFFFFFFF) {
264 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
265 }
266 }
267 if (mSpecConfig.runTRDTracking) {
268 mConfig->configWorkflow.inputs.setBits(GPUDataTypes::InOutType::TRDTracklets, true);
269 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TRDTracking, true);
270 }
271 if (mSpecConfig.runITSTracking) {
272 mConfig->configWorkflow.inputs.setBits(GPUDataTypes::InOutType::ITSClusters, true);
273 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::ITSTracks, true);
274 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::ITSTracking, true);
275 }
276 if (mSpecConfig.outputSharedClusterMap) {
277 mConfig->configProcessing.outputSharedClusterMap = true;
278 }
279 if (!mSpecConfig.outputTracks) {
280 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
281 }
282 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
283
284 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
285 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
286 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
287 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
288 }
289 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
290 throw std::runtime_error("double pipeline requires exactly 2 threads");
291 } */
292 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
293 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
294 }
295
296 if (mSpecConfig.enableDoublePipeline != 2) {
297 mGPUReco = std::make_unique<GPUO2Interface>();
298
299 // initialize TPC calib objects
300 initFunctionTPCCalib(ic);
301
302 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
303 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
304 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
305 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
306 if (mConfig->configCalib.fastTransform == nullptr) {
307 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
308 }
309
310 if (mConfParam->matLUTFile.size()) {
311 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
312 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
313 if (mConfig->configCalib.matLUT == nullptr) {
314 LOGF(fatal, "Error loading matlut file");
315 }
316 } else {
317 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
318 }
319
320 if (mSpecConfig.readTRDtracklets) {
321 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
322 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
323 }
324
325 mConfig->configProcessing.willProvideO2PropagatorLate = true;
326 mConfig->configProcessing.o2PropagatorUseGPUField = true;
327
328 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
329 mConfig->configProcessing.printSettings = true;
330 if (mConfParam->printSettings > 1) {
331 mConfig->PrintParam();
332 }
333 }
334
335 // Configuration is prepared, initialize the tracker.
336 if (mGPUReco->Initialize(config) != 0) {
337 throw std::invalid_argument("GPU Reconstruction initialization failed");
338 }
339 if (mSpecConfig.outputQA) {
340 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
341 }
342 if (mSpecConfig.outputErrorQA) {
343 mGPUReco->setErrorCodeOutput(&mErrorQA);
344 }
345
346 // initialize ITS
347 if (mSpecConfig.runITSTracking) {
348 initFunctionITS(ic);
349 }
350 }
351
352 if (mSpecConfig.enableDoublePipeline) {
353 initPipeline(ic);
354 if (mConfParam->dump >= 2) {
355 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
356 }
357 }
358
359 auto& callbacks = ic.services().get<CallbackService>();
360 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
361 if (info.size == 0) {
362 return;
363 }
364 if (mSpecConfig.enableDoublePipeline) {
365 mRegionInfos.emplace_back(info);
366 }
367 if (mSpecConfig.enableDoublePipeline == 2) {
368 return;
369 }
370 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
371 return;
372 }
373 int32_t fd = 0;
374 if (mConfParam->mutexMemReg) {
375 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
376 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
377 if (fd == -1) {
378 throw std::runtime_error("Error opening memlock mutex lock file");
379 }
380 fchmod(fd, mask);
381 if (lockf(fd, F_LOCK, 0)) {
382 throw std::runtime_error("Error locking memlock mutex file");
383 }
384 }
385 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
386 if (mConfParam->benchmarkMemoryRegistration) {
387 start = std::chrono::high_resolution_clock::now();
388 }
389 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
390 throw std::runtime_error("Error registering memory for GPU");
391 }
392 if (mConfParam->benchmarkMemoryRegistration) {
393 end = std::chrono::high_resolution_clock::now();
394 std::chrono::duration<double> elapsed_seconds = end - start;
395 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
396 }
397 if (mConfParam->mutexMemReg) {
398 if (lockf(fd, F_ULOCK, 0)) {
399 throw std::runtime_error("Error unlocking memlock mutex file");
400 }
401 close(fd);
402 }
403 });
404
405 mTimer->Stop();
406 mTimer->Reset();
407}
408
410{
411 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
412 handlePipelineStop();
413}
414
416{
417 handlePipelineEndOfStream(ec);
418}
419
421{
422 if (mSpecConfig.enableDoublePipeline != 2) {
423 finaliseCCDBTPC(matcher, obj);
424 if (mSpecConfig.runITSTracking) {
425 finaliseCCDBITS(matcher, obj);
426 }
427 }
428 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
429 mGRPGeomUpdated = true;
430 return;
431 }
432}
433
434template <class D, class E, class F, class G, class H, class I, class J, class K>
435void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
436{
437 if (mSpecConfig.enableDoublePipeline == 1) {
438 return;
439 }
440 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
441 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
442
443 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
444 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
445 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
446 tpcZSmeta.Pointers[i][j].clear();
447 tpcZSmeta.Sizes[i][j].clear();
448 }
449 }
450 }
451 if (mSpecConfig.zsOnTheFly) {
452 tpcZSonTheFlySizes = {0};
453 // tpcZSonTheFlySizes: #zs pages per endpoint:
454 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
455 bool recv = false, recvsizes = false;
456 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
457 if (recvsizes) {
458 throw std::runtime_error("Received multiple ZSSIZES data");
459 }
460 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
461 recvsizes = true;
462 }
463 // zs pages
464 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
465 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
466 if (recv) {
467 throw std::runtime_error("Received multiple TPCZS data");
468 }
469 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
470 recv = true;
471 }
472 if (!recv || !recvsizes) {
473 throw std::runtime_error("TPC ZS on the fly data not received");
474 }
475
476 uint32_t offset = 0;
477 for (uint32_t i = 0; i < NSectors; i++) {
478 uint32_t pageSector = 0;
479 for (uint32_t j = 0; j < NEndpoints; j++) {
480 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
481 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
482 }
483 if (mVerbosity >= 1) {
484 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
485 }
486 }
487 }
488 if (mSpecConfig.zsDecoder) {
489 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
490 auto isSameRdh = [](const char* left, const char* right) -> bool {
491 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
492 };
493 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
494 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
495 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
496 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
497 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
498 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
499 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
500 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
501 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
502 };
503 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
504 if (checkForZSData(ptr, subSpec)) {
505 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
506 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
507 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
508 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
509 }
510 };
511 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
512 debugTFDump = true;
513 static uint32_t nErrors = 0;
514 nErrors++;
515 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
516 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
517 }
518 }
519
520 int32_t totalCount = 0;
521 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
522 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
523 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
524 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
525 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
526 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
527 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
528 totalCount += tpcZSmeta.Pointers[i][j].size();
529 }
530 }
531 } else if (mSpecConfig.decompressTPC) {
532 if (mSpecConfig.decompressTPCFromROOT) {
533 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
534 compClustersFlatDummy.setForward(&compClustersDummy);
535 pCompClustersFlat = &compClustersFlatDummy;
536 } else {
537 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
538 }
539 if (pCompClustersFlat == nullptr) {
540 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
541 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
542 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
543 }
544 } else if (!mSpecConfig.zsOnTheFly) {
545 if (mVerbosity) {
546 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
547 }
548 }
549}
550
551int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
552{
553 int32_t retVal = 0;
554 if (mConfParam->dump < 2) {
555 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
556
557 if (retVal == 0 && mSpecConfig.runITSTracking) {
558 retVal = runITSTracking(*pc);
559 }
560 }
561
562 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
563 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
564 }
565 return retVal;
566}
567
568void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
569{
570 if (mOldCalibObjects.size() > 0) {
571 mOldCalibObjects.pop();
572 }
573 mOldCalibObjects.emplace(std::move(oldCalibObjects));
574}
575
577{
578 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
579 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
580
581 auto cput = mTimer->CpuTime();
582 auto realt = mTimer->RealTime();
583 mTimer->Start(false);
584 mNTFs++;
585
586 std::vector<gsl::span<const char>> inputs;
587
588 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
589 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
590 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
591 o2::tpc::CompressedClusters compClustersDummy;
594 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
595 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
596 std::unique_ptr<char[]> tmpEmptyCompClusters;
597
598 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
599 bool debugTFDump = false;
600
601 if (mSpecConfig.processMC) {
602 getWorkflowTPCInput_mc = true;
603 }
604 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
605 getWorkflowTPCInput_clusters = true;
606 }
607 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
608 getWorkflowTPCInput_digits = true;
609 }
610
611 // ------------------------------ Handle inputs ------------------------------
612
613 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
614
616 if (mSpecConfig.enableDoublePipeline != 2) {
617 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
618 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
619 }
620 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
621 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
622 }
623 }
624
626 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
627 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
628
629 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
630 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
631 mTFSettings->hasTfStartOrbit = 1;
632 mTFSettings->hasNHBFPerTF = 1;
633 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
634 mTFSettings->hasRunStartOrbit = 0;
635 ptrs.settingsTF = mTFSettings.get();
636
637 if (mSpecConfig.enableDoublePipeline != 2) {
638 if (mVerbosity) {
639 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
640 }
641 if (mConfParam->checkFirstTfOrbit) {
642 static uint32_t lastFirstTFOrbit = -1;
643 static uint32_t lastTFCounter = -1;
644 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
645 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
646 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
647 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
648 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
649 }
650 }
651 lastFirstTFOrbit = tinfo.firstTForbit;
652 lastTFCounter = tinfo.tfCounter;
653 }
654 }
655
657 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
658 if (mSpecConfig.readTRDtracklets) {
659 o2::globaltracking::DataRequest dataRequestTRD;
661 inputTracksTRD.collectData(pc, dataRequestTRD);
662 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
663 }
664
665 void* ptrEp[NSectors * NEndpoints] = {};
666 bool doInputDigits = false, doInputDigitsMC = false;
667 if (mSpecConfig.decompressTPC) {
668 ptrs.tpcCompressedClusters = pCompClustersFlat;
669 } else if (mSpecConfig.zsOnTheFly) {
670 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
671 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
672 ptrs.tpcZS = &tpcZS;
673 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
674 } else if (mSpecConfig.zsDecoder) {
675 ptrs.tpcZS = &tpcZS;
676 if (mSpecConfig.processMC) {
677 throw std::runtime_error("Cannot process MC information, none available");
678 }
679 } else if (mSpecConfig.caClusterer) {
680 doInputDigits = true;
681 doInputDigitsMC = mSpecConfig.processMC;
682 } else {
683 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
684 }
685
686 if (mTPCSectorMask != 0xFFFFFFFFF) {
687 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
688 for (uint32_t i = 0; i < NSectors; i++) {
689 if (!(mTPCSectorMask & (1ul << i))) {
690 if (ptrs.tpcZS) {
691 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
692 tpcZS.sector[i].zsPtr[j] = nullptr;
693 tpcZS.sector[i].nZSPtr[j] = nullptr;
694 tpcZS.sector[i].count[j] = 0;
695 }
696 }
697 }
698 }
699 }
700
701 GPUTrackingInOutDigits tpcDigitsMap;
702 GPUTPCDigitsMCInput tpcDigitsMapMC;
703 if (doInputDigits) {
704 ptrs.tpcPackedDigits = &tpcDigitsMap;
705 if (doInputDigitsMC) {
706 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
707 }
708 for (uint32_t i = 0; i < NSectors; i++) {
709 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
710 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
711 if (doInputDigitsMC) {
712 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
713 }
714 }
715 }
716
717 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
718 if (mClusterOutputIds.size() > 0) {
719 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
720 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
721 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
722 }
723
724 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
725
726 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
727 if (mSpecConfig.enableDoublePipeline) {
728 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
729 return;
730 }
731 }
732
733 // ------------------------------ Prepare outputs ------------------------------
734
735 GPUInterfaceOutputs outputRegions;
736 using outputDataType = char;
737 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
738 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
739 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
740 std::unordered_set<std::string> outputsCreated;
741
742 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
743 if (condition) {
744 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
745 if (mConfParam->allocateOutputOnTheFly) {
746 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
747 size += offset;
748 if (mVerbosity) {
749 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
750 }
751 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
752 if (mVerbosity) {
753 start = std::chrono::high_resolution_clock::now();
754 }
755 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
756 outputsCreated.insert(name);
757 if (mVerbosity) {
758 end = std::chrono::high_resolution_clock::now();
759 std::chrono::duration<double> elapsed_seconds = end - start;
760 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
761 << ": " << elapsed_seconds.count() << "s";
762 }
763 return (buffer.second = buffer.first->get().data()) + offset;
764 };
765 } else {
766 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
767 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
768 region.size = buffer.first->get().size() - offset;
769 outputsCreated.insert(name);
770 }
771 }
772 };
773
774 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
775 if (!buffer.first) {
776 return;
777 }
778 if (buffer.first->get().size() < size) {
779 throw std::runtime_error("Invalid buffer size requested");
780 }
781 buffer.first->get().resize(size);
782 if (size && buffer.first->get().data() != buffer.second) {
783 throw std::runtime_error("Inconsistent buffer address after downsize");
784 }
785 };
786
787 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
788 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
789 downSizeBuffer(buffer, size);
790 };*/
791
792 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
793 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
794 if (!buffer.first) {
795 return;
796 }
797 if (span.size() && buffer.second != (char*)span.data()) {
798 throw std::runtime_error("Buffer does not match span");
799 }
800 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
801 };
802
803 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
804 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (DataDescription) "CLUSTERNATIVE", NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
805 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
806 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
807 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, (DataDescription) "TRACKS", 0));
808 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, (DataDescription) "CLUSREFS", 0));
809 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, (DataDescription) "TRACKSMCLBL", 0));
810 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
812 if (mSpecConfig.processMC && mSpecConfig.caClusterer) {
813 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
814 }
815
816 // ------------------------------ Actual processing ------------------------------
817
818 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
819 throw std::runtime_error("Invalid input for gpu tracking");
820 }
821
822 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
823
824 calibObjectStruct oldCalibObjects;
825 doCalibUpdates(pc, oldCalibObjects);
826
827 lockDecodeInput.reset();
828
829 if (mConfParam->dump) {
830 if (mNTFs == 1) {
831 mGPUReco->DumpSettings();
832 }
833 mGPUReco->DumpEvent(mNTFs - 1, &ptrs);
834 }
835 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
836 if (mConfParam->dumpBadTFMode == 2) {
837 ptrsDump.reset(new GPUTrackingInOutPointers);
838 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
839 }
840
841 int32_t retVal = 0;
842 if (mSpecConfig.enableDoublePipeline) {
843 if (!pipelineContext->jobSubmitted) {
844 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
845 } else {
846 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
847 }
848 std::unique_lock lk(pipelineContext->jobFinishedMutex);
849 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
850 retVal = pipelineContext->jobReturnValue;
851 } else {
852 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
853 uint32_t threadIndex = mNextThreadIndex;
854 if (mConfig->configProcessing.doublePipeline) {
855 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
856 }
857
858 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
859 }
860 if (retVal != 0) {
861 debugTFDump = true;
862 }
863 cleanOldCalibsTPCPtrs(oldCalibObjects);
864
865 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
866
867 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
868 mNDebugDumps++;
869 if (mConfParam->dumpBadTFMode <= 1) {
870 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
871 FILE* fp = fopen(filename.c_str(), "w+b");
872 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
873 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
874 auto data = pc.inputs().get<gsl::span<char>>(ref);
875 if (mConfParam->dumpBadTFMode == 1) {
876 uint64_t size = data.size();
877 fwrite(&size, 1, sizeof(size), fp);
878 }
879 fwrite(data.data(), 1, data.size(), fp);
880 }
881 fclose(fp);
882 } else if (mConfParam->dumpBadTFMode == 2) {
883 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get());
884 }
885 }
886
887 if (mConfParam->dump == 2) {
888 return;
889 }
890
891 // ------------------------------ Varios postprocessing steps ------------------------------
892
893 bool createEmptyOutput = false;
894 if (retVal != 0) {
895 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
896 if (mConfig->configProcessing.throttleAlarms) {
897 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
898 } else {
899 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
900 }
901 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
902 } else {
903 throw std::runtime_error("GPU Reconstruction error: error code " + std::to_string(retVal));
904 }
905 }
906
907 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
908 if (createEmptyOutput) {
909 memset(&ptrs, 0, sizeof(ptrs));
910 for (uint32_t i = 0; i < outputRegions.count(); i++) {
911 if (outputBuffers[i].first) {
912 size_t toSize = 0;
913 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
914 toSize = sizeof(*ptrs.tpcCompressedClusters);
915 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
916 toSize = sizeof(o2::tpc::ClusterCountIndex);
917 }
918 outputBuffers[i].first->get().resize(toSize);
919 outputBuffers[i].second = outputBuffers[i].first->get().data();
920 if (toSize) {
921 memset(outputBuffers[i].second, 0, toSize);
922 }
923 }
924 }
925 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
926 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
927 ptrs.clustersNative = tmpEmptyClNative.get();
928 if (mSpecConfig.processMC) {
929 MCLabelContainer cont;
930 cont.flatten_to(clustersMCBuffer.first);
931 clustersMCBuffer.second = clustersMCBuffer.first;
932 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
933 }
934 } else {
935 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
936 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
937 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
938 if (!mConfParam->allocateOutputOnTheFly) {
939 for (uint32_t i = 0; i < outputRegions.count(); i++) {
940 if (outputRegions.asArray()[i].ptrBase) {
941 if (outputRegions.asArray()[i].size == 1) {
942 throw std::runtime_error("Preallocated buffer size exceeded");
943 }
944 outputRegions.asArray()[i].checkCurrent();
945 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
946 }
947 }
948 }
949 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
950 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
951 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
952
953 // if requested, tune TPC tracks
954 if (ptrs.nOutputTracksTPCO2) {
955 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
956 }
957
958 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
959 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
960 }
961 }
962
963 if (mConfig->configWorkflow.outputs.isSet(GPUDataTypes::InOutType::TPCMergedTracks)) {
964 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
965 }
966
967 if (mSpecConfig.outputCompClusters) {
970 }
971
972 if (mClusterOutputIds.size() > 0) {
973 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
974 if (mSpecConfig.sendClustersPerSector) {
975 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
976 for (uint32_t i = 0; i < NSectors; i++) {
977 if (mTPCSectorMask & (1ul << i)) {
979 clusterOutputSectorHeader.sectorBits = (1ul << i);
980 char* buffer = pc.outputs().make<char>({gDataOriginTPC, "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
981 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
982 memset(outIndex, 0, sizeof(*outIndex));
983 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
984 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
985 }
986 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
987 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
988 MCLabelContainer cont;
989 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
990 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
991 for (const auto& label : labels) {
992 cont.addElement(j, label);
993 }
994 }
995 ConstMCLabelContainer contflat;
996 cont.flatten_to(contflat);
997 pc.outputs().snapshot({gDataOriginTPC, "CLNATIVEMCLBL", subspec, {clusterOutputSectorHeader}}, contflat);
998 }
999 }
1000 }
1001 } else {
1002 // Clusters are shipped as single message, fill ClusterCountIndex
1003 DataHeader::SubSpecificationType subspec = NSectors;
1004 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
1005 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
1006 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
1007 if (mSpecConfig.processMC && mSpecConfig.caClusterer && accessIndex.clustersMCTruth) {
1008 pc.outputs().snapshot({gDataOriginTPC, "CLNATIVEMCLBL", subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
1009 }
1010 }
1011 }
1012 if (mSpecConfig.outputQA) {
1013 TObjArray out;
1014 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
1015 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
1016 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
1017 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
1018 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
1019 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1020 if (sendQAOutput) {
1021 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1022 }
1023 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1024 if (sendQAOutput) {
1025 mQA->cleanup();
1026 }
1027 }
1028 if (mSpecConfig.outputErrorQA) {
1029 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1030 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1031 }
1032 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1034 }
1035 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1037 }
1038 mTimer->Stop();
1039 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1040}
1041
1042void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1043{
1044 GPUCalibObjectsConst newCalibObjects;
1045 GPUNewCalibValues newCalibValues;
1046 // check for updates of TPC calibration objects
1047 bool needCalibUpdate = false;
1048 if (mGRPGeomUpdated) {
1049 mGRPGeomUpdated = false;
1050 needCalibUpdate = true;
1051
1052 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1055 mITSGeometryCreated = true;
1056 }
1057
1058 if (mAutoSolenoidBz) {
1059 newCalibValues.newSolenoidField = true;
1060 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1061 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1062 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1063 }
1064 if (mAutoContinuousMaxTimeBin) {
1065 newCalibValues.newContinuousMaxTimeBin = true;
1066 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1067 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1068 }
1069
1070 if (!mPropagatorInstanceCreated) {
1071 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1072 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1073 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1074 }
1075 mPropagatorInstanceCreated = true;
1076 }
1077
1078 if (!mMatLUTCreated) {
1079 if (mConfParam->matLUTFile.size() == 0) {
1080 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1081 LOG(info) << "Loaded material budget lookup table";
1082 }
1083 mMatLUTCreated = true;
1084 }
1085 if (mSpecConfig.readTRDtracklets && !mTRDGeometryCreated) {
1086 auto gm = o2::trd::Geometry::instance();
1087 gm->createPadPlaneArray();
1088 gm->createClusterMatrixArray();
1089 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1090 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1091 LOG(info) << "Loaded TRD geometry";
1092 mTRDGeometryCreated = true;
1093 }
1094 }
1095 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1096 if (mSpecConfig.runITSTracking) {
1097 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1098 }
1099 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1100 newCalibValues.newTPCTimeBinCut = true;
1101 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1102 needCalibUpdate = true;
1103 }
1104 if (needCalibUpdate) {
1105 LOG(info) << "Updating GPUReconstruction calibration objects";
1106 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1107 }
1108}
1109
1111{
1112 Options opts;
1113 if (mSpecConfig.enableDoublePipeline) {
1114 bool send = mSpecConfig.enableDoublePipeline == 2;
1115 char* o2jobid = getenv("O2JOBID");
1116 char* numaid = getenv("NUMAID");
1117 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1118 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1119 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1120 }
1121 if (mSpecConfig.enableDoublePipeline == 2) {
1122 return opts;
1123 }
1124 if (mSpecConfig.outputTracks) {
1126 }
1127 return opts;
1128}
1129
1131{
1132 Inputs inputs;
1133 if (mSpecConfig.zsDecoder) {
1134 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1135 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1136 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1137 if (mSpecConfig.askDISTSTF) {
1138 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1139 }
1140 }
1141 if (mSpecConfig.enableDoublePipeline == 2) {
1142 if (!mSpecConfig.zsDecoder) {
1143 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1144 }
1145 return inputs;
1146 } else if (mSpecConfig.enableDoublePipeline == 1) {
1147 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1148 }
1149 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1150 // calibration objects for TPC clusterization
1151 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1152 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1153 }
1154 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1155 // calibration objects for TPC tracking
1156 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1157 if (mapSources != 0) {
1158 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1160 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1161 }
1163 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1164 }
1165 }
1166
1167 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1168 if (mSpecConfig.tpcUseMCTimeGain) {
1169 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1170 } else {
1171 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1172 }
1173 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1174 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1176 Options optsDummy;
1177 o2::tpc::CorrectionMapsLoaderGloOpts gloOpts{mSpecConfig.lumiScaleType, mSpecConfig.lumiScaleMode, mSpecConfig.enableMShape, mSpecConfig.enableCTPLumi};
1178 mCalibObjects.mFastTransformHelper->requestCCDBInputs(inputs, optsDummy, gloOpts); // option filled here is lost
1179 }
1180 if (mSpecConfig.decompressTPC) {
1181 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1182 } else if (mSpecConfig.caClusterer) {
1183 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1184 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1185 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1186 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1187 }
1188 } else if (mSpecConfig.runTPCTracking) {
1189 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1190 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1191 }
1192 if (mSpecConfig.processMC) {
1193 if (mSpecConfig.caClusterer) {
1194 if (!mSpecConfig.zsDecoder) {
1195 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1196 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1197 }
1198 } else if (mSpecConfig.runTPCTracking) {
1199 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1200 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1201 }
1202 }
1203
1204 if (mSpecConfig.zsOnTheFly) {
1205 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1206 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1207 }
1208 if (mSpecConfig.readTRDtracklets) {
1209 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1210 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1211 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1212 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1213 }
1214
1215 if (mSpecConfig.runITSTracking) {
1216 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", 0, Lifetime::Timeframe);
1217 inputs.emplace_back("patterns", "ITS", "PATTERNS", 0, Lifetime::Timeframe);
1218 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", 0, Lifetime::Timeframe);
1219 if (mSpecConfig.itsTriggerType == 1) {
1220 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1221 } else if (mSpecConfig.itsTriggerType == 2) {
1222 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1223 }
1224 if (mSpecConfig.enableDoublePipeline != 2) {
1225 if (mSpecConfig.isITS3) {
1226 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1227 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1228 } else {
1229 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1230 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1231 }
1232 if (mSpecConfig.itsOverrBeamEst) {
1233 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1234 }
1235 }
1236 if (mSpecConfig.processMC) {
1237 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", 0, Lifetime::Timeframe);
1238 inputs.emplace_back("ITSMC2ROframes", "ITS", "CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1239 }
1240 }
1241
1242 return inputs;
1243};
1244
1246{
1247 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1248 std::vector<OutputSpec> outputSpecs;
1249 if (mSpecConfig.enableDoublePipeline == 2) {
1250 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1251 return outputSpecs;
1252 }
1253 if (mSpecConfig.outputTracks) {
1254 outputSpecs.emplace_back(gDataOriginTPC, "TRACKS", 0, Lifetime::Timeframe);
1255 outputSpecs.emplace_back(gDataOriginTPC, "CLUSREFS", 0, Lifetime::Timeframe);
1256 }
1257 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1258 outputSpecs.emplace_back(gDataOriginTPC, "TRACKSMCLBL", 0, Lifetime::Timeframe);
1259 }
1260 if (mSpecConfig.outputCompClusters) {
1261 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1262 }
1263 if (mSpecConfig.outputCompClustersFlat) {
1264 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1265 }
1266 if (mSpecConfig.outputCAClusters) {
1267 for (auto const& sector : mTPCSectors) {
1268 mClusterOutputIds.emplace_back(sector);
1269 }
1270 if (mSpecConfig.sendClustersPerSector) {
1271 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1272 for (const auto sector : mTPCSectors) {
1273 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1274 }
1275 } else {
1276 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1277 }
1278 if (mSpecConfig.processMC) {
1279 if (mSpecConfig.sendClustersPerSector) {
1280 for (const auto sector : mTPCSectors) {
1281 outputSpecs.emplace_back(gDataOriginTPC, "CLNATIVEMCLBL", sector, Lifetime::Timeframe);
1282 }
1283 } else {
1284 outputSpecs.emplace_back(gDataOriginTPC, "CLNATIVEMCLBL", NSectors, Lifetime::Timeframe);
1285 }
1286 }
1287 }
1288 if (mSpecConfig.outputSharedClusterMap) {
1289 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1290 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1291 }
1292 if (mSpecConfig.tpcTriggerHandling) {
1293 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1294 }
1295 if (mSpecConfig.outputQA) {
1296 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1297 }
1298 if (mSpecConfig.outputErrorQA) {
1299 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1300 }
1301
1302 if (mSpecConfig.runITSTracking) {
1303 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1304 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1305 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1306 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1307 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1308 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1309
1310 if (mSpecConfig.processMC) {
1311 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1312 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1313 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1314 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1315 }
1316 }
1317
1318 return outputSpecs;
1319};
1320
1322{
1323 ExitPipeline();
1324 mQA.reset(nullptr);
1325 mDisplayFrontend.reset(nullptr);
1326 mGPUReco.reset(nullptr);
1327}
1328
1329} // namespace o2::gpu
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Fetching neural networks for clusterization from CCDB.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Definition of TPCFastTransform class.
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:143
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
void loadIndividualFromCCDB(std::map< std::string, std::string > settings)
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:592
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:620
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"