Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
42#include "TPCFastTransform.h"
51#include "TPCBase/RDHUtils.h"
53#include "GPUO2InterfaceQA.h"
54#include "GPUO2Interface.h"
55#include "GPUO2InterfaceUtils.h"
56#include "CalibdEdxContainer.h"
57#include "GPUNewCalibValues.h"
58#include "TPCPadGainCalib.h"
59#include "TPCZSLinkMapping.h"
61#include "TPCBase/Sector.h"
62#include "TPCBase/Utils.h"
70#include "Algorithm/Parser.h"
73#include "TRDBase/Geometry.h"
80#include "GPUWorkflowInternal.h"
82// #include "Framework/ThreadPool.h"
83
84#include <TStopwatch.h>
85#include <TObjArray.h>
86#include <TH1F.h>
87#include <TH2F.h>
88#include <TH1D.h>
89#include <TGraphAsymmErrors.h>
90
91#include <filesystem>
92#include <memory>
93#include <vector>
94#include <iomanip>
95#include <stdexcept>
96#include <regex>
97#include <sys/types.h>
98#include <sys/stat.h>
99#include <fcntl.h>
100#include <chrono>
101#include <unordered_set>
102
103using namespace o2::framework;
104using namespace o2::header;
105using namespace o2::gpu;
106using namespace o2::base;
107using namespace o2::dataformats;
109
110namespace o2::gpu
111{
112
113GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
114{
115 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
116 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
117 }
118
119 mConfig.reset(new GPUO2InterfaceConfiguration);
120 mConfParam.reset(new GPUSettingsO2);
121 mTFSettings.reset(new GPUSettingsTF);
122 mTimer.reset(new TStopwatch);
123 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
124
125 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
126 *gPolicyOrder = &mPolicyOrder;
127 }
128}
129
131
133{
135 GPUO2InterfaceConfiguration& config = *mConfig.get();
136 GPUSettingsProcessingNNclusterizer& mNNClusterizerSettings = mConfig->configProcessing.nn;
137
138 if (mNNClusterizerSettings.nnLoadFromCCDB) {
139 LOG(info) << "Loading neural networks from CCDB";
140 o2::tpc::NeuralNetworkClusterizer nnClusterizerFetcher;
141 nnClusterizerFetcher.initCcdbApi(mNNClusterizerSettings.nnCCDBURL);
142 std::map<std::string, std::string> ccdbSettings = {
143 {"nnCCDBURL", mNNClusterizerSettings.nnCCDBURL},
144 {"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
145 {"inputDType", mNNClusterizerSettings.nnInferenceInputDType},
146 {"outputDType", mNNClusterizerSettings.nnInferenceOutputDType},
147 {"outputFolder", mNNClusterizerSettings.nnLocalFolder},
148 {"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
149 {"nnCCDBWithMomentum", std::to_string(mNNClusterizerSettings.nnCCDBWithMomentum)},
150 {"nnCCDBBeamType", mNNClusterizerSettings.nnCCDBBeamType},
151 {"nnCCDBInteractionRate", std::to_string(mNNClusterizerSettings.nnCCDBInteractionRate)}};
152
153 std::string nnFetchFolder = mNNClusterizerSettings.nnLocalFolder;
154 std::vector<std::string> evalMode = o2::utils::Str::tokenize(mNNClusterizerSettings.nnEvalMode, ':');
155
156 if (evalMode[0] == "c1") {
157 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
158 ccdbSettings["nnCCDBEvalType"] = "classification_c1";
159 ccdbSettings["outputFile"] = "net_classification_c1.onnx";
160 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
161 } else if (evalMode[0] == "c2") {
162 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
163 ccdbSettings["nnCCDBEvalType"] = "classification_c2";
164 ccdbSettings["outputFile"] = "net_classification_c2.onnx";
165 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
166 }
167
168 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
169 ccdbSettings["nnCCDBEvalType"] = "regression_c1";
170 ccdbSettings["outputFile"] = "net_regression_c1.onnx";
171 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
172 if (evalMode[1] == "r2") {
173 ccdbSettings["nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
174 ccdbSettings["nnCCDBEvalType"] = "regression_c2";
175 ccdbSettings["outputFile"] = "net_regression_c2.onnx";
176 nnClusterizerFetcher.loadIndividualFromCCDB(ccdbSettings);
177 }
178 LOG(info) << "Neural network loading done!";
179 }
180
181 // Create configuration object and fill settings
182 mConfig->configGRP.solenoidBzNominalGPU = 0;
183 mTFSettings->hasSimStartOrbit = 1;
184 auto& hbfu = o2::raw::HBFUtils::Instance();
185 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
186
187 *mConfParam = mConfig->ReadConfigurableParam();
188 if (mConfParam->display) {
189 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
190 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
191 if (mConfig->configProcessing.eventDisplay != nullptr) {
192 LOG(info) << "Event display enabled";
193 } else {
194 throw std::runtime_error("GPU Event Display frontend could not be created!");
195 }
196 }
197 if (mSpecConfig.enableDoublePipeline) {
198 mConfig->configProcessing.doublePipeline = 1;
199 }
200
201 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
202 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
203 if (mAutoContinuousMaxTimeBin) {
204 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
205 }
206 if (mConfig->configProcessing.deviceNum == -2) {
207 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
208 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
209 mConfig->configProcessing.deviceNum = myId;
210 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
211 }
212 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
213 mVerbosity = 1;
214 }
215 mConfig->configProcessing.runMC = mSpecConfig.processMC;
216 if (mSpecConfig.outputQA) {
217 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
218 throw std::runtime_error("Need MC information to create QA plots");
219 }
220 if (!mSpecConfig.processMC) {
221 mConfig->configQA.noMC = true;
222 }
223 mConfig->configQA.shipToQC = true;
224 if (!mConfig->configProcessing.runQA) {
225 mConfig->configQA.enableLocalOutput = false;
226 mQATaskMask = (mSpecConfig.processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
227 mConfig->configProcessing.runQA = -mQATaskMask;
228 }
229 }
230 mConfig->configInterface.outputToExternalBuffers = true;
231 if (mConfParam->synchronousProcessing) {
232 mConfig->configReconstruction.useMatLUT = false;
233 }
234 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
235 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
236 }
237
238 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
239 if (mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
240 mConfig->configWorkflow.steps.set(GPUDataTypes::RecoStep::TPCConversion,
243 mConfig->configWorkflow.outputs.set(GPUDataTypes::InOutType::TPCMergedTracks);
244 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
245 }
246 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
247 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCCompression, true);
248 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCCompressedClusters, true);
249 }
250 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCClusters);
251 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
252 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCRaw);
253 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCClusterFinding, true);
254 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCClusters, true);
255 }
256 if (mSpecConfig.decompressTPC) {
257 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCCompression, false);
258 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCDecompression, true);
259 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCCompressedClusters);
260 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCClusters, true);
261 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCCompressedClusters, false);
262 if (mTPCSectorMask != 0xFFFFFFFFF) {
263 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
264 }
265 }
266 if (mSpecConfig.runTRDTracking) {
267 mConfig->configWorkflow.inputs.setBits(GPUDataTypes::InOutType::TRDTracklets, true);
268 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TRDTracking, true);
269 }
270 if (mSpecConfig.runITSTracking) {
271 mConfig->configWorkflow.inputs.setBits(GPUDataTypes::InOutType::ITSClusters, true);
272 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::ITSTracks, true);
273 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::ITSTracking, true);
274 }
275 if (mSpecConfig.outputSharedClusterMap) {
276 mConfig->configProcessing.outputSharedClusterMap = true;
277 }
278 if (!mSpecConfig.outputTracks) {
279 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
280 }
281 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
282
283 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
284 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
285 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
286 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
287 }
288 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
289 throw std::runtime_error("double pipeline requires exactly 2 threads");
290 } */
291 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
292 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
293 }
294
295 if (mSpecConfig.enableDoublePipeline != 2) {
296 mGPUReco = std::make_unique<GPUO2Interface>();
297
298 // initialize TPC calib objects
299 initFunctionTPCCalib(ic);
300
301 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
302 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
303 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
304 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
305 if (mConfig->configCalib.fastTransform == nullptr) {
306 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
307 }
308
309 if (mConfParam->matLUTFile.size()) {
310 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
311 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
312 if (mConfig->configCalib.matLUT == nullptr) {
313 LOGF(fatal, "Error loading matlut file");
314 }
315 } else {
316 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
317 }
318
319 if (mSpecConfig.readTRDtracklets) {
320 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
321 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
322 }
323
324 mConfig->configProcessing.willProvideO2PropagatorLate = true;
325 mConfig->configProcessing.o2PropagatorUseGPUField = true;
326
327 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
328 mConfig->configProcessing.printSettings = true;
329 if (mConfParam->printSettings > 1) {
330 mConfig->PrintParam();
331 }
332 }
333
334 // Configuration is prepared, initialize the tracker.
335 if (mGPUReco->Initialize(config) != 0) {
336 throw std::invalid_argument("GPU Reconstruction initialization failed");
337 }
338 if (mSpecConfig.outputQA) {
339 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
340 }
341 if (mSpecConfig.outputErrorQA) {
342 mGPUReco->setErrorCodeOutput(&mErrorQA);
343 }
344
345 // initialize ITS
346 if (mSpecConfig.runITSTracking) {
347 initFunctionITS(ic);
348 }
349 }
350
351 if (mSpecConfig.enableDoublePipeline) {
352 initPipeline(ic);
353 if (mConfParam->dump >= 2) {
354 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
355 }
356 }
357
358 auto& callbacks = ic.services().get<CallbackService>();
359 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
360 if (info.size == 0) {
361 return;
362 }
363 if (mSpecConfig.enableDoublePipeline) {
364 mRegionInfos.emplace_back(info);
365 }
366 if (mSpecConfig.enableDoublePipeline == 2) {
367 return;
368 }
369 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
370 return;
371 }
372 int32_t fd = 0;
373 if (mConfParam->mutexMemReg) {
374 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
375 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
376 if (fd == -1) {
377 throw std::runtime_error("Error opening memlock mutex lock file");
378 }
379 fchmod(fd, mask);
380 if (lockf(fd, F_LOCK, 0)) {
381 throw std::runtime_error("Error locking memlock mutex file");
382 }
383 }
384 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
385 if (mConfParam->benchmarkMemoryRegistration) {
386 start = std::chrono::high_resolution_clock::now();
387 }
388 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
389 throw std::runtime_error("Error registering memory for GPU");
390 }
391 if (mConfParam->benchmarkMemoryRegistration) {
392 end = std::chrono::high_resolution_clock::now();
393 std::chrono::duration<double> elapsed_seconds = end - start;
394 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
395 }
396 if (mConfParam->mutexMemReg) {
397 if (lockf(fd, F_ULOCK, 0)) {
398 throw std::runtime_error("Error unlocking memlock mutex file");
399 }
400 close(fd);
401 }
402 });
403
404 mTimer->Stop();
405 mTimer->Reset();
406}
407
409{
410 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
411 handlePipelineStop();
412}
413
415{
416 handlePipelineEndOfStream(ec);
417}
418
420{
421 if (mSpecConfig.enableDoublePipeline != 2) {
422 finaliseCCDBTPC(matcher, obj);
423 if (mSpecConfig.runITSTracking) {
424 finaliseCCDBITS(matcher, obj);
425 }
426 }
427 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
428 mGRPGeomUpdated = true;
429 return;
430 }
431}
432
433template <class D, class E, class F, class G, class H, class I, class J, class K>
434void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
435{
436 if (mSpecConfig.enableDoublePipeline == 1) {
437 return;
438 }
439 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
440 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
441
442 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
443 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
444 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
445 tpcZSmeta.Pointers[i][j].clear();
446 tpcZSmeta.Sizes[i][j].clear();
447 }
448 }
449 }
450 if (mSpecConfig.zsOnTheFly) {
451 tpcZSonTheFlySizes = {0};
452 // tpcZSonTheFlySizes: #zs pages per endpoint:
453 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
454 bool recv = false, recvsizes = false;
455 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
456 if (recvsizes) {
457 throw std::runtime_error("Received multiple ZSSIZES data");
458 }
459 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
460 recvsizes = true;
461 }
462 // zs pages
463 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
464 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
465 if (recv) {
466 throw std::runtime_error("Received multiple TPCZS data");
467 }
468 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
469 recv = true;
470 }
471 if (!recv || !recvsizes) {
472 throw std::runtime_error("TPC ZS on the fly data not received");
473 }
474
475 uint32_t offset = 0;
476 for (uint32_t i = 0; i < NSectors; i++) {
477 uint32_t pageSector = 0;
478 for (uint32_t j = 0; j < NEndpoints; j++) {
479 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
480 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
481 }
482 if (mVerbosity >= 1) {
483 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
484 }
485 }
486 }
487 if (mSpecConfig.zsDecoder) {
488 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
489 auto isSameRdh = [](const char* left, const char* right) -> bool {
490 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
491 };
492 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
493 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
494 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
495 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
496 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
497 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
498 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
499 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
500 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
501 };
502 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
503 if (checkForZSData(ptr, subSpec)) {
504 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
505 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
506 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
507 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
508 }
509 };
510 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
511 debugTFDump = true;
512 static uint32_t nErrors = 0;
513 nErrors++;
514 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
515 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
516 }
517 }
518
519 int32_t totalCount = 0;
520 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
521 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
522 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
523 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
524 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
525 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
526 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
527 totalCount += tpcZSmeta.Pointers[i][j].size();
528 }
529 }
530 } else if (mSpecConfig.decompressTPC) {
531 if (mSpecConfig.decompressTPCFromROOT) {
532 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
533 compClustersFlatDummy.setForward(&compClustersDummy);
534 pCompClustersFlat = &compClustersFlatDummy;
535 } else {
536 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
537 }
538 if (pCompClustersFlat == nullptr) {
539 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
540 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
541 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
542 }
543 } else if (!mSpecConfig.zsOnTheFly) {
544 if (mVerbosity) {
545 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
546 }
547 }
548}
549
550int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
551{
552 int32_t retVal = 0;
553 if (mConfParam->dump < 2) {
554 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
555
556 if (retVal == 0 && mSpecConfig.runITSTracking) {
557 retVal = runITSTracking(*pc);
558 }
559 }
560
561 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
562 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
563 }
564 return retVal;
565}
566
567void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
568{
569 if (mOldCalibObjects.size() > 0) {
570 mOldCalibObjects.pop();
571 }
572 mOldCalibObjects.emplace(std::move(oldCalibObjects));
573}
574
576{
577 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
578 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
579
580 auto cput = mTimer->CpuTime();
581 auto realt = mTimer->RealTime();
582 mTimer->Start(false);
583 mNTFs++;
584
585 std::vector<gsl::span<const char>> inputs;
586
587 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
588 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
589 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
590 o2::tpc::CompressedClusters compClustersDummy;
593 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
594 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
595 std::unique_ptr<char[]> tmpEmptyCompClusters;
596
597 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
598 bool debugTFDump = false;
599
600 if (mSpecConfig.processMC) {
601 getWorkflowTPCInput_mc = true;
602 }
603 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
604 getWorkflowTPCInput_clusters = true;
605 }
606 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
607 getWorkflowTPCInput_digits = true;
608 }
609
610 // ------------------------------ Handle inputs ------------------------------
611
612 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
613
615 if (mSpecConfig.enableDoublePipeline != 2) {
616 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
617 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
618 }
619 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
620 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
621 }
622 }
623
625 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
626 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
627
628 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
629 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
630 mTFSettings->hasTfStartOrbit = 1;
631 mTFSettings->hasNHBFPerTF = 1;
632 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
633 mTFSettings->hasRunStartOrbit = 0;
634 ptrs.settingsTF = mTFSettings.get();
635
636 if (mSpecConfig.enableDoublePipeline != 2) {
637 if (mVerbosity) {
638 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
639 }
640 if (mConfParam->checkFirstTfOrbit) {
641 static uint32_t lastFirstTFOrbit = -1;
642 static uint32_t lastTFCounter = -1;
643 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
644 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
645 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
646 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
647 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
648 }
649 }
650 lastFirstTFOrbit = tinfo.firstTForbit;
651 lastTFCounter = tinfo.tfCounter;
652 }
653 }
654
656 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
657 if (mSpecConfig.readTRDtracklets) {
658 o2::globaltracking::DataRequest dataRequestTRD;
660 inputTracksTRD.collectData(pc, dataRequestTRD);
661 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
662 }
663
664 void* ptrEp[NSectors * NEndpoints] = {};
665 bool doInputDigits = false, doInputDigitsMC = false;
666 if (mSpecConfig.decompressTPC) {
667 ptrs.tpcCompressedClusters = pCompClustersFlat;
668 } else if (mSpecConfig.zsOnTheFly) {
669 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
670 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
671 ptrs.tpcZS = &tpcZS;
672 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
673 } else if (mSpecConfig.zsDecoder) {
674 ptrs.tpcZS = &tpcZS;
675 if (mSpecConfig.processMC) {
676 throw std::runtime_error("Cannot process MC information, none available");
677 }
678 } else if (mSpecConfig.caClusterer) {
679 doInputDigits = true;
680 doInputDigitsMC = mSpecConfig.processMC;
681 } else {
682 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
683 }
684
685 if (mTPCSectorMask != 0xFFFFFFFFF) {
686 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
687 for (uint32_t i = 0; i < NSectors; i++) {
688 if (!(mTPCSectorMask & (1ul << i))) {
689 if (ptrs.tpcZS) {
690 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
691 tpcZS.sector[i].zsPtr[j] = nullptr;
692 tpcZS.sector[i].nZSPtr[j] = nullptr;
693 tpcZS.sector[i].count[j] = 0;
694 }
695 }
696 }
697 }
698 }
699
700 GPUTrackingInOutDigits tpcDigitsMap;
701 GPUTPCDigitsMCInput tpcDigitsMapMC;
702 if (doInputDigits) {
703 ptrs.tpcPackedDigits = &tpcDigitsMap;
704 if (doInputDigitsMC) {
705 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
706 }
707 for (uint32_t i = 0; i < NSectors; i++) {
708 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
709 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
710 if (doInputDigitsMC) {
711 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
712 }
713 }
714 }
715
716 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
717 if (mClusterOutputIds.size() > 0) {
718 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
719 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
720 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
721 }
722
723 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
724
725 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
726 if (mSpecConfig.enableDoublePipeline) {
727 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
728 return;
729 }
730 }
731
732 // ------------------------------ Prepare outputs ------------------------------
733
734 GPUInterfaceOutputs outputRegions;
735 using outputDataType = char;
736 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
737 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
738 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
739 std::unordered_set<std::string> outputsCreated;
740
741 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
742 if (condition) {
743 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
744 if (mConfParam->allocateOutputOnTheFly) {
745 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
746 size += offset;
747 if (mVerbosity) {
748 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
749 }
750 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
751 if (mVerbosity) {
752 start = std::chrono::high_resolution_clock::now();
753 }
754 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
755 outputsCreated.insert(name);
756 if (mVerbosity) {
757 end = std::chrono::high_resolution_clock::now();
758 std::chrono::duration<double> elapsed_seconds = end - start;
759 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
760 << ": " << elapsed_seconds.count() << "s";
761 }
762 return (buffer.second = buffer.first->get().data()) + offset;
763 };
764 } else {
765 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
766 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
767 region.size = buffer.first->get().size() - offset;
768 outputsCreated.insert(name);
769 }
770 }
771 };
772
773 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
774 if (!buffer.first) {
775 return;
776 }
777 if (buffer.first->get().size() < size) {
778 throw std::runtime_error("Invalid buffer size requested");
779 }
780 buffer.first->get().resize(size);
781 if (size && buffer.first->get().data() != buffer.second) {
782 throw std::runtime_error("Inconsistent buffer address after downsize");
783 }
784 };
785
786 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
787 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
788 downSizeBuffer(buffer, size);
789 };*/
790
791 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
792 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
793 if (!buffer.first) {
794 return;
795 }
796 if (span.size() && buffer.second != (char*)span.data()) {
797 throw std::runtime_error("Buffer does not match span");
798 }
799 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
800 };
801
802 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
803 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
804 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
805 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
806 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
807 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
808 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
809 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
811 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
812 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
813 }
814
815 // ------------------------------ Actual processing ------------------------------
816
817 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
818 throw std::runtime_error("Invalid input for gpu tracking");
819 }
820
821 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
822
823 calibObjectStruct oldCalibObjects;
824 doCalibUpdates(pc, oldCalibObjects);
825
826 lockDecodeInput.reset();
827
828 uint32_t threadIndex;
829 if (mConfParam->dump) {
830 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
831 while (pipelineContext->jobThreadIndex == -1) {
832 }
833 threadIndex = pipelineContext->jobThreadIndex;
834 } else {
835 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
836 }
837
838 std::string dir = "";
839 if (mConfParam->dumpFolder != "") {
840 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
841 if (mNTFs == 1) {
842 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
843 }
844 dir += "/";
845 }
846 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
847 mGPUReco->DumpSettings(threadIndex, dir.c_str());
848 }
849 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
850 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
851 mNTFDumps++;
852 }
853 }
854 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
855 if (mConfParam->dumpBadTFMode == 2) {
856 ptrsDump.reset(new GPUTrackingInOutPointers);
857 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
858 }
859
860 int32_t retVal = 0;
861 if (mSpecConfig.enableDoublePipeline) {
862 if (!pipelineContext->jobSubmitted) {
863 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
864 } else {
865 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
866 }
867 std::unique_lock lk(pipelineContext->jobFinishedMutex);
868 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
869 retVal = pipelineContext->jobReturnValue;
870 threadIndex = pipelineContext->jobThreadIndex;
871 } else {
872 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
873 threadIndex = mNextThreadIndex;
874 if (mConfig->configProcessing.doublePipeline) {
875 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
876 }
877
878 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
879 }
880 if (retVal != 0) {
881 debugTFDump = true;
882 }
883 cleanOldCalibsTPCPtrs(oldCalibObjects);
884
885 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
886
887 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
888 mNDebugDumps++;
889 if (mConfParam->dumpBadTFMode <= 1) {
890 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
891 FILE* fp = fopen(filename.c_str(), "w+b");
892 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
893 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
894 auto data = pc.inputs().get<gsl::span<char>>(ref);
895 if (mConfParam->dumpBadTFMode == 1) {
896 uint64_t size = data.size();
897 fwrite(&size, 1, sizeof(size), fp);
898 }
899 fwrite(data.data(), 1, data.size(), fp);
900 }
901 fclose(fp);
902 } else if (mConfParam->dumpBadTFMode == 2) {
903 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
904 }
905 }
906
907 if (mConfParam->dump == 2) {
908 return;
909 }
910
911 // ------------------------------ Varios postprocessing steps ------------------------------
912
913 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
915 }
916 bool createEmptyOutput = false;
917 if (retVal != 0) {
918 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
919 if (mConfig->configProcessing.throttleAlarms) {
920 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
921 } else {
922 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
923 }
924 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
925 } else {
926 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
927 }
928 }
929
930 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
931 if (createEmptyOutput) {
932 memset(&ptrs, 0, sizeof(ptrs));
933 for (uint32_t i = 0; i < outputRegions.count(); i++) {
934 if (outputBuffers[i].first) {
935 size_t toSize = 0;
936 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
937 toSize = sizeof(*ptrs.tpcCompressedClusters);
938 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
939 toSize = sizeof(o2::tpc::ClusterCountIndex);
940 }
941 outputBuffers[i].first->get().resize(toSize);
942 outputBuffers[i].second = outputBuffers[i].first->get().data();
943 if (toSize) {
944 memset(outputBuffers[i].second, 0, toSize);
945 }
946 }
947 }
948 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
949 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
950 ptrs.clustersNative = tmpEmptyClNative.get();
951 if (mSpecConfig.processMC) {
952 MCLabelContainer cont;
953 cont.flatten_to(clustersMCBuffer.first);
954 clustersMCBuffer.second = clustersMCBuffer.first;
955 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
956 }
957 } else {
958 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
959 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
960 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
961 if (!mConfParam->allocateOutputOnTheFly) {
962 for (uint32_t i = 0; i < outputRegions.count(); i++) {
963 if (outputRegions.asArray()[i].ptrBase) {
964 if (outputRegions.asArray()[i].size == 1) {
965 throw std::runtime_error("Preallocated buffer size exceeded");
966 }
967 outputRegions.asArray()[i].checkCurrent();
968 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
969 }
970 }
971 }
972 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
973 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
974 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
975
976 // if requested, tune TPC tracks
977 if (ptrs.nOutputTracksTPCO2) {
978 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
979 }
980
981 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
982 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
983 }
984 }
985
986 if (mConfig->configWorkflow.outputs.isSet(GPUDataTypes::InOutType::TPCMergedTracks)) {
987 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
988 }
989
990 if (mSpecConfig.outputCompClustersRoot) {
993 }
994
995 if (mClusterOutputIds.size() > 0) {
996 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
997 if (mSpecConfig.sendClustersPerSector) {
998 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
999 for (uint32_t i = 0; i < NSectors; i++) {
1000 if (mTPCSectorMask & (1ul << i)) {
1002 clusterOutputSectorHeader.sectorBits = (1ul << i);
1003 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
1004 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
1005 memset(outIndex, 0, sizeof(*outIndex));
1006 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
1007 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
1008 }
1009 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
1010 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
1011 MCLabelContainer cont;
1012 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
1013 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
1014 for (const auto& label : labels) {
1015 cont.addElement(j, label);
1016 }
1017 }
1018 ConstMCLabelContainer contflat;
1019 cont.flatten_to(contflat);
1020 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
1021 }
1022 }
1023 }
1024 } else {
1025 // Clusters are shipped as single message, fill ClusterCountIndex
1026 DataHeader::SubSpecificationType subspec = NSectors;
1027 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
1028 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
1029 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
1030 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
1031 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
1032 }
1033 }
1034 }
1035 if (mSpecConfig.outputQA) {
1036 TObjArray out;
1037 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
1038 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
1039 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
1040 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
1041 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
1042 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1043 if (sendQAOutput) {
1044 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1045 }
1046 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1047 if (sendQAOutput) {
1048 mQA->cleanup();
1049 }
1050 }
1051 if (mSpecConfig.outputErrorQA) {
1052 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1053 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1054 }
1055 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1057 }
1058 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1060 }
1061 mTimer->Stop();
1062 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1063}
1064
1065void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1066{
1067 GPUCalibObjectsConst newCalibObjects;
1068 GPUNewCalibValues newCalibValues;
1069 // check for updates of TPC calibration objects
1070 bool needCalibUpdate = false;
1071 if (mGRPGeomUpdated) {
1072 mGRPGeomUpdated = false;
1073 needCalibUpdate = true;
1074
1075 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1078 mITSGeometryCreated = true;
1079 }
1080
1081 if (mAutoSolenoidBz) {
1082 newCalibValues.newSolenoidField = true;
1083 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1084 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1085 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1086 }
1087 if (mAutoContinuousMaxTimeBin) {
1088 newCalibValues.newContinuousMaxTimeBin = true;
1089 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1090 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1091 }
1092
1093 if (!mPropagatorInstanceCreated) {
1094 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1095 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1096 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1097 }
1098 mPropagatorInstanceCreated = true;
1099 }
1100
1101 if (!mMatLUTCreated) {
1102 if (mConfParam->matLUTFile.size() == 0) {
1103 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1104 LOG(info) << "Loaded material budget lookup table";
1105 }
1106 mMatLUTCreated = true;
1107 }
1108 if (mSpecConfig.readTRDtracklets && !mTRDGeometryCreated) {
1109 auto gm = o2::trd::Geometry::instance();
1110 gm->createPadPlaneArray();
1111 gm->createClusterMatrixArray();
1112 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1113 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1114 LOG(info) << "Loaded TRD geometry";
1115 mTRDGeometryCreated = true;
1116 }
1117 }
1118 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1119 if (mSpecConfig.runITSTracking) {
1120 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1121 }
1122 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1123 newCalibValues.newTPCTimeBinCut = true;
1124 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1125 needCalibUpdate = true;
1126 }
1127 if (needCalibUpdate) {
1128 LOG(info) << "Updating GPUReconstruction calibration objects";
1129 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1130 }
1131}
1132
1134{
1135 Options opts;
1136 if (mSpecConfig.enableDoublePipeline) {
1137 bool send = mSpecConfig.enableDoublePipeline == 2;
1138 char* o2jobid = getenv("O2JOBID");
1139 char* numaid = getenv("NUMAID");
1140 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1141 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1142 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1143 }
1144 if (mSpecConfig.enableDoublePipeline == 2) {
1145 return opts;
1146 }
1147 if (mSpecConfig.outputTracks) {
1149 }
1150 return opts;
1151}
1152
1154{
1155 Inputs inputs;
1156 if (mSpecConfig.zsDecoder) {
1157 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1158 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1159 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1160 if (mSpecConfig.askDISTSTF) {
1161 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1162 }
1163 }
1164 if (mSpecConfig.enableDoublePipeline == 2) {
1165 if (!mSpecConfig.zsDecoder) {
1166 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1167 }
1168 return inputs;
1169 } else if (mSpecConfig.enableDoublePipeline == 1) {
1170 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1171 }
1172 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1173 // calibration objects for TPC clusterization
1174 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1175 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1176 }
1177 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1178 // calibration objects for TPC tracking
1179 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1180 if (mapSources != 0) {
1181 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1183 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1184 }
1186 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1187 }
1188 }
1189
1190 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1191 if (mSpecConfig.tpcUseMCTimeGain) {
1192 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1193 } else {
1194 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1195 }
1196 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1197 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1199 Options optsDummy;
1200 o2::tpc::CorrectionMapsLoaderGloOpts gloOpts{mSpecConfig.lumiScaleType, mSpecConfig.lumiScaleMode, mSpecConfig.enableMShape, mSpecConfig.enableCTPLumi};
1201 mCalibObjects.mFastTransformHelper->requestCCDBInputs(inputs, optsDummy, gloOpts); // option filled here is lost
1202 }
1203 if (mSpecConfig.decompressTPC) {
1204 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1205 } else if (mSpecConfig.caClusterer) {
1206 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1207 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1208 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1209 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1210 }
1211 } else if (mSpecConfig.runTPCTracking) {
1212 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1213 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1214 }
1215 if (mSpecConfig.processMC) {
1216 if (mSpecConfig.caClusterer) {
1217 if (!mSpecConfig.zsDecoder) {
1218 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1219 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1220 }
1221 } else if (mSpecConfig.runTPCTracking) {
1222 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1223 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1224 }
1225 }
1226
1227 if (mSpecConfig.zsOnTheFly) {
1228 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1229 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1230 }
1231 if (mSpecConfig.readTRDtracklets) {
1232 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1233 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1234 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1235 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1236 }
1237
1238 if (mSpecConfig.runITSTracking) {
1239 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", 0, Lifetime::Timeframe);
1240 inputs.emplace_back("patterns", "ITS", "PATTERNS", 0, Lifetime::Timeframe);
1241 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", 0, Lifetime::Timeframe);
1242 if (mSpecConfig.itsTriggerType == 1) {
1243 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1244 } else if (mSpecConfig.itsTriggerType == 2) {
1245 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1246 }
1247 if (mSpecConfig.enableDoublePipeline != 2) {
1248 if (mSpecConfig.isITS3) {
1249 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1250 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1251 } else {
1252 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1253 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1254 }
1255 if (mSpecConfig.itsOverrBeamEst) {
1256 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1257 }
1258 }
1259 if (mSpecConfig.processMC) {
1260 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", 0, Lifetime::Timeframe);
1261 inputs.emplace_back("ITSMC2ROframes", "ITS", "CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1262 }
1263 }
1264
1265 return inputs;
1266};
1267
1269{
1270 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1271 std::vector<OutputSpec> outputSpecs;
1272 if (mSpecConfig.enableDoublePipeline == 2) {
1273 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1274 return outputSpecs;
1275 }
1276 if (mSpecConfig.outputTracks) {
1277 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1278 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1279 }
1280 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1281 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1282 }
1283 if (mSpecConfig.outputCompClustersRoot) {
1284 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1285 }
1286 if (mSpecConfig.outputCompClustersFlat) {
1287 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1288 }
1289 if (mSpecConfig.outputCAClusters) {
1290 for (auto const& sector : mTPCSectors) {
1291 mClusterOutputIds.emplace_back(sector);
1292 }
1293 if (mSpecConfig.sendClustersPerSector) {
1294 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1295 for (const auto sector : mTPCSectors) {
1296 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1297 }
1298 } else {
1299 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1300 }
1301 if (mSpecConfig.processMC) {
1302 if (mSpecConfig.sendClustersPerSector) {
1303 for (const auto sector : mTPCSectors) {
1304 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1305 }
1306 } else {
1307 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1308 }
1309 }
1310 }
1311 if (mSpecConfig.outputSharedClusterMap) {
1312 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1313 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1314 }
1315 if (mSpecConfig.tpcTriggerHandling) {
1316 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1317 }
1318 if (mSpecConfig.outputQA) {
1319 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1320 }
1321 if (mSpecConfig.outputErrorQA) {
1322 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1323 }
1324
1325 if (mSpecConfig.runITSTracking) {
1326 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1327 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1328 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1329 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1330 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1331 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1332
1333 if (mSpecConfig.processMC) {
1334 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1335 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1336 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1337 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1338 }
1339 }
1340
1341 return outputSpecs;
1342};
1343
1345{
1346 ExitPipeline();
1347 mQA.reset(nullptr);
1348 mDisplayFrontend.reset(nullptr);
1349 mGPUReco.reset(nullptr);
1350}
1351
1352} // namespace o2::gpu
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Fetching neural networks for clusterization from CCDB.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Definition of TPCFastTransform class.
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:147
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
void loadIndividualFromCCDB(std::map< std::string, std::string > settings)
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:592
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:620
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"