Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
42#include "TPCFastTransform.h"
51#include "TPCBase/RDHUtils.h"
53#include "GPUO2InterfaceQA.h"
54#include "GPUO2Interface.h"
55#include "GPUO2InterfaceUtils.h"
56#include "CalibdEdxContainer.h"
57#include "ORTRootSerializer.h"
58#include "GPUNewCalibValues.h"
59#include "TPCPadGainCalib.h"
60#include "TPCZSLinkMapping.h"
62#include "TPCBase/Sector.h"
63#include "TPCBase/Utils.h"
71#include "Algorithm/Parser.h"
74#include "TRDBase/Geometry.h"
81#include "GPUWorkflowInternal.h"
82#include "GPUDataTypesQA.h"
83// #include "Framework/ThreadPool.h"
84
85#include <TStopwatch.h>
86#include <TObjArray.h>
87#include <TH1F.h>
88#include <TH2F.h>
89#include <TH1D.h>
90#include <TGraphAsymmErrors.h>
91
92#include <filesystem>
93#include <memory>
94#include <vector>
95#include <iomanip>
96#include <stdexcept>
97#include <regex>
98#include <sys/types.h>
99#include <sys/stat.h>
100#include <fcntl.h>
101#include <chrono>
102#include <unordered_set>
103
104using namespace o2::framework;
105using namespace o2::header;
106using namespace o2::gpu;
107using namespace o2::base;
108using namespace o2::dataformats;
110
111namespace o2::gpu
112{
113
114GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
115{
116 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
117 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
118 }
119
120 mConfig.reset(new GPUO2InterfaceConfiguration);
121 mConfParam.reset(new GPUSettingsO2);
122 mTFSettings.reset(new GPUSettingsTF);
123 mTimer.reset(new TStopwatch);
124 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
125
126 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
127 *gPolicyOrder = &mPolicyOrder;
128 }
129}
130
132
134{
136 GPUO2InterfaceConfiguration& config = *mConfig.get();
137
138 // Create configuration object and fill settings
139 mConfig->configGRP.solenoidBzNominalGPU = 0;
140 mTFSettings->hasSimStartOrbit = 1;
141 auto& hbfu = o2::raw::HBFUtils::Instance();
142 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
143
144 *mConfParam = mConfig->ReadConfigurableParam();
145
146 if (mConfParam->display) {
147 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
148 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
149 if (mConfig->configProcessing.eventDisplay != nullptr) {
150 LOG(info) << "Event display enabled";
151 } else {
152 throw std::runtime_error("GPU Event Display frontend could not be created!");
153 }
154 }
155 if (mSpecConfig.enableDoublePipeline) {
156 mConfig->configProcessing.doublePipeline = 1;
157 }
158
159 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
160 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
161 if (mAutoContinuousMaxTimeBin) {
162 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
163 }
164 if (mConfig->configProcessing.deviceNum == -2) {
165 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
166 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
167 mConfig->configProcessing.deviceNum = myId;
168 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
169 }
170 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
171 mVerbosity = 1;
172 }
173 mConfig->configProcessing.runMC = mSpecConfig.processMC;
174 if (mSpecConfig.outputQA) {
175 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
176 throw std::runtime_error("Need MC information to create QA plots");
177 }
178 if (!mSpecConfig.processMC) {
179 mConfig->configQA.noMC = true;
180 }
181 mConfig->configQA.shipToQC = true;
182 if (!mConfig->configProcessing.runQA) {
183 mConfig->configQA.enableLocalOutput = false;
184 mQATaskMask = (mSpecConfig.processMC ? gpudatatypes::gpuqa::tasksAllMC : gpudatatypes::gpuqa::tasksNone) | (mConfig->configQA.clusterRejectionHistograms ? gpudatatypes::gpuqa::taskClusterCounts : gpudatatypes::gpuqa::tasksNone);
185 mConfig->configProcessing.runQA = -mQATaskMask;
186 }
187 }
188 mConfig->configInterface.outputToExternalBuffers = true;
189 const bool runTracking = mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat;
190 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
191
192 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
193 if (runTracking) {
194 mConfig->configWorkflow.steps.set(gpudatatypes::RecoStep::TPCConversion,
197 mConfig->configWorkflow.outputs.set(gpudatatypes::InOutType::TPCMergedTracks);
198 }
199 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
200 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, true);
201 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, true);
202 }
203 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCClusters);
204 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
205 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCRaw);
206 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCClusterFinding, true);
207 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
208 }
209 if (mSpecConfig.decompressTPC) {
210 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, false);
211 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCDecompression, true);
212 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCCompressedClusters);
213 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
214 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, false);
215 if (mTPCSectorMask != 0xFFFFFFFFF) {
216 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
217 }
218 }
219 if (mSpecConfig.runTRDTracking) {
220 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::TRDTracklets, true);
221 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TRDTracking, true);
222 }
223 if (mSpecConfig.runITSTracking) {
224 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::ITSClusters, true);
225 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::ITSTracks, true);
226 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::ITSTracking, true);
227 }
228 if (mSpecConfig.outputSharedClusterMap) {
229 mConfig->configProcessing.outputSharedClusterMap = true;
230 }
231 if (!mSpecConfig.outputTracks) {
232 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
233 }
234 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
235
236 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
237 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
238 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
239 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
240 }
241 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
242 throw std::runtime_error("double pipeline requires exactly 2 threads");
243 } */
244 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
245 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
246 }
247
248 if (mSpecConfig.enableDoublePipeline != 2) {
249 mGPUReco = std::make_unique<GPUO2Interface>();
250
251 // initialize TPC calib objects
252 initFunctionTPCCalib(ic);
253
254 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
255 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
256 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
257 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
258 if (mConfig->configCalib.fastTransform == nullptr) {
259 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
260 }
261
262 if (mConfParam->matLUTFile.size()) {
263 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
264 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
265 if (mConfig->configCalib.matLUT == nullptr) {
266 LOGF(fatal, "Error loading matlut file");
267 }
268 } else {
269 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
270 }
271
272 if (mSpecConfig.readTRDtracklets) {
273 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
274 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
275 }
276
277 mConfig->configProcessing.willProvideO2PropagatorLate = true;
278 mConfig->configProcessing.o2PropagatorUseGPUField = true;
279
280 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
281 mConfig->configProcessing.printSettings = true;
282 if (mConfParam->printSettings > 1) {
283 mConfig->PrintParam();
284 }
285 }
286
287 // Configuration is prepared, initialize the tracker.
288 if (mGPUReco->Initialize(config) != 0) {
289 throw std::invalid_argument("GPU Reconstruction initialization failed");
290 }
291 if (mSpecConfig.outputQA) {
292 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
293 }
294 if (mSpecConfig.outputErrorQA) {
295 mGPUReco->setErrorCodeOutput(&mErrorQA);
296 }
297
298 // initialize ITS
299 if (mSpecConfig.runITSTracking) {
300 initFunctionITS(ic);
301 }
302 }
303
304 if (mSpecConfig.enableDoublePipeline) {
305 initPipeline(ic);
306 if (mConfParam->dump >= 2) {
307 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
308 }
309 }
310
311 auto& callbacks = ic.services().get<CallbackService>();
312 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
313 if (info.size == 0) {
314 return;
315 }
316 if (mSpecConfig.enableDoublePipeline) {
317 mRegionInfos.emplace_back(info);
318 }
319 if (mSpecConfig.enableDoublePipeline == 2) {
320 return;
321 }
322 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
323 return;
324 }
325 int32_t fd = 0;
326 if (mConfParam->mutexMemReg) {
327 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
328 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
329 if (fd == -1) {
330 throw std::runtime_error("Error opening memlock mutex lock file");
331 }
332 fchmod(fd, mask);
333 if (lockf(fd, F_LOCK, 0)) {
334 throw std::runtime_error("Error locking memlock mutex file");
335 }
336 }
337 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
338 if (mConfParam->benchmarkMemoryRegistration) {
339 start = std::chrono::high_resolution_clock::now();
340 }
341 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
342 throw std::runtime_error("Error registering memory for GPU");
343 }
344 if (mConfParam->benchmarkMemoryRegistration) {
345 end = std::chrono::high_resolution_clock::now();
346 std::chrono::duration<double> elapsed_seconds = end - start;
347 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
348 }
349 if (mConfParam->mutexMemReg) {
350 if (lockf(fd, F_ULOCK, 0)) {
351 throw std::runtime_error("Error unlocking memlock mutex file");
352 }
353 close(fd);
354 }
355 });
356
357 mTimer->Stop();
358 mTimer->Reset();
359}
360
362{
363 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
364 handlePipelineStop();
365}
366
368{
369 handlePipelineEndOfStream(ec);
370}
371
373{
374 if (mSpecConfig.enableDoublePipeline != 2) {
375 finaliseCCDBTPC(matcher, obj);
376 if (mSpecConfig.runITSTracking) {
377 finaliseCCDBITS(matcher, obj);
378 }
379 }
380 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
381 mGRPGeomUpdated = true;
382 return;
383 }
384}
385
386template <class D, class E, class F, class G, class H, class I, class J, class K>
387void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
388{
389 if (mSpecConfig.enableDoublePipeline == 1) {
390 return;
391 }
392 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
393 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
394
395 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
396 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
397 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
398 tpcZSmeta.Pointers[i][j].clear();
399 tpcZSmeta.Sizes[i][j].clear();
400 }
401 }
402 }
403 if (mSpecConfig.zsOnTheFly) {
404 tpcZSonTheFlySizes = {0};
405 // tpcZSonTheFlySizes: #zs pages per endpoint:
406 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
407 bool recv = false, recvsizes = false;
408 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
409 if (recvsizes) {
410 throw std::runtime_error("Received multiple ZSSIZES data");
411 }
412 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
413 recvsizes = true;
414 }
415 // zs pages
416 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
417 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
418 if (recv) {
419 throw std::runtime_error("Received multiple TPCZS data");
420 }
421 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
422 recv = true;
423 }
424 if (!recv || !recvsizes) {
425 throw std::runtime_error("TPC ZS on the fly data not received");
426 }
427
428 uint32_t offset = 0;
429 for (uint32_t i = 0; i < NSectors; i++) {
430 uint32_t pageSector = 0;
431 for (uint32_t j = 0; j < NEndpoints; j++) {
432 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
433 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
434 }
435 if (mVerbosity >= 1) {
436 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
437 }
438 }
439 }
440 if (mSpecConfig.zsDecoder) {
441 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
442 auto isSameRdh = [](const char* left, const char* right) -> bool {
443 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
444 };
445 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
446 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
447 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
448 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
449 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
450 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
451 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
452 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
453 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
454 };
455 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
456 if (checkForZSData(ptr, subSpec)) {
457 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
458 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
459 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
460 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
461 }
462 };
463 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
464 debugTFDump = true;
465 static uint32_t nErrors = 0;
466 nErrors++;
467 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
468 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
469 }
470 }
471
472 int32_t totalCount = 0;
473 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
474 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
475 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
476 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
477 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
478 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
479 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
480 totalCount += tpcZSmeta.Pointers[i][j].size();
481 }
482 }
483 } else if (mSpecConfig.decompressTPC) {
484 if (mSpecConfig.decompressTPCFromROOT) {
485 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
486 compClustersFlatDummy.setForward(&compClustersDummy);
487 pCompClustersFlat = &compClustersFlatDummy;
488 } else {
489 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
490 }
491 if (pCompClustersFlat == nullptr) {
492 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
493 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
494 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
495 }
496 } else if (!mSpecConfig.zsOnTheFly) {
497 if (mVerbosity) {
498 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
499 }
500 }
501}
502
503int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
504{
505 int32_t retVal = 0;
506 if (mConfParam->dump < 2) {
507 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
508
509 if (retVal == 0 && mSpecConfig.runITSTracking) {
510 retVal = runITSTracking(*pc);
511 }
512 }
513
514 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
515 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
516 }
517 return retVal;
518}
519
520void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
521{
522 if (mOldCalibObjects.size() > 0) {
523 mOldCalibObjects.pop();
524 }
525 mOldCalibObjects.emplace(std::move(oldCalibObjects));
526}
527
529{
530 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
531 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
532
533 auto cput = mTimer->CpuTime();
534 auto realt = mTimer->RealTime();
535 mTimer->Start(false);
536 mNTFs++;
537
538 std::vector<gsl::span<const char>> inputs;
539
540 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
541 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
542 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
543 o2::tpc::CompressedClusters compClustersDummy;
546 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
547 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
548 std::unique_ptr<char[]> tmpEmptyCompClusters;
549
550 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
551 bool debugTFDump = false;
552
553 if (mSpecConfig.processMC) {
554 getWorkflowTPCInput_mc = true;
555 }
556 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
557 getWorkflowTPCInput_clusters = true;
558 }
559 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
560 getWorkflowTPCInput_digits = true;
561 }
562
563 // ------------------------------ Handle inputs ------------------------------
564
565 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
566
568 if (mSpecConfig.enableDoublePipeline != 2) {
569 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
570 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
571 }
572 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
573 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
574 }
575 }
576
578 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
579 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
580
581 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
582 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
583 mTFSettings->hasTfStartOrbit = 1;
584 mTFSettings->hasNHBFPerTF = 1;
585 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
586 mTFSettings->hasRunStartOrbit = 0;
587 ptrs.settingsTF = mTFSettings.get();
588
589 if (mSpecConfig.enableDoublePipeline != 2) {
590 if (mVerbosity) {
591 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
592 }
593 if (mConfParam->checkFirstTfOrbit) {
594 static uint32_t lastFirstTFOrbit = -1;
595 static uint32_t lastTFCounter = -1;
596 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
597 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
598 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
599 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
600 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
601 }
602 }
603 lastFirstTFOrbit = tinfo.firstTForbit;
604 lastTFCounter = tinfo.tfCounter;
605 }
606 }
607
609 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
610 if (mSpecConfig.readTRDtracklets) {
611 o2::globaltracking::DataRequest dataRequestTRD;
613 inputTracksTRD.collectData(pc, dataRequestTRD);
614 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
615 }
616
617 void* ptrEp[NSectors * NEndpoints] = {};
618 bool doInputDigits = false, doInputDigitsMC = false;
619 if (mSpecConfig.decompressTPC) {
620 ptrs.tpcCompressedClusters = pCompClustersFlat;
621 } else if (mSpecConfig.zsOnTheFly) {
622 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
623 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
624 ptrs.tpcZS = &tpcZS;
625 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
626 } else if (mSpecConfig.zsDecoder) {
627 ptrs.tpcZS = &tpcZS;
628 if (mSpecConfig.processMC) {
629 throw std::runtime_error("Cannot process MC information, none available");
630 }
631 } else if (mSpecConfig.caClusterer) {
632 doInputDigits = true;
633 doInputDigitsMC = mSpecConfig.processMC;
634 } else {
635 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
636 }
637
638 if (mTPCSectorMask != 0xFFFFFFFFF) {
639 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
640 for (uint32_t i = 0; i < NSectors; i++) {
641 if (!(mTPCSectorMask & (1ul << i))) {
642 if (ptrs.tpcZS) {
643 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
644 tpcZS.sector[i].zsPtr[j] = nullptr;
645 tpcZS.sector[i].nZSPtr[j] = nullptr;
646 tpcZS.sector[i].count[j] = 0;
647 }
648 }
649 }
650 }
651 }
652
653 GPUTrackingInOutDigits tpcDigitsMap;
654 GPUTPCDigitsMCInput tpcDigitsMapMC;
655 if (doInputDigits) {
656 ptrs.tpcPackedDigits = &tpcDigitsMap;
657 if (doInputDigitsMC) {
658 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
659 }
660 for (uint32_t i = 0; i < NSectors; i++) {
661 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
662 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
663 if (doInputDigitsMC) {
664 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
665 }
666 }
667 }
668
669 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
670 if (mClusterOutputIds.size() > 0) {
671 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
672 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
673 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
674 }
675
676 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
677
678 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
679 if (mSpecConfig.enableDoublePipeline) {
680 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
681 return;
682 }
683 }
684
685 // ------------------------------ Prepare outputs ------------------------------
686
687 GPUInterfaceOutputs outputRegions;
688 using outputDataType = char;
689 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
690 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
691 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
692 std::unordered_set<std::string> outputsCreated;
693
694 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
695 if (condition) {
696 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
697 if (mConfParam->allocateOutputOnTheFly) {
698 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
699 size += offset;
700 if (mVerbosity) {
701 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
702 }
703 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
704 if (mVerbosity) {
705 start = std::chrono::high_resolution_clock::now();
706 }
707 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
708 outputsCreated.insert(name);
709 if (mVerbosity) {
710 end = std::chrono::high_resolution_clock::now();
711 std::chrono::duration<double> elapsed_seconds = end - start;
712 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
713 << ": " << elapsed_seconds.count() << "s";
714 }
715 return (buffer.second = buffer.first->get().data()) + offset;
716 };
717 } else {
718 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
719 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
720 region.size = buffer.first->get().size() - offset;
721 outputsCreated.insert(name);
722 }
723 }
724 };
725
726 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
727 if (!buffer.first) {
728 return;
729 }
730 if (buffer.first->get().size() < size) {
731 throw std::runtime_error("Invalid buffer size requested");
732 }
733 buffer.first->get().resize(size);
734 if (size && buffer.first->get().data() != buffer.second) {
735 throw std::runtime_error("Inconsistent buffer address after downsize");
736 }
737 };
738
739 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
740 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
741 downSizeBuffer(buffer, size);
742 };*/
743
744 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
745 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
746 if (!buffer.first) {
747 return;
748 }
749 if (span.size() && buffer.second != (char*)span.data()) {
750 throw std::runtime_error("Buffer does not match span");
751 }
752 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
753 };
754
755 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
756 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
757 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
758 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
759 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
760 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
761 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
762 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
764 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
765 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
766 }
767
768 // ------------------------------ Actual processing ------------------------------
769
770 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
771 throw std::runtime_error("Invalid input for gpu tracking");
772 }
773
774 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
775
776 calibObjectStruct oldCalibObjects;
777 doCalibUpdates(pc, oldCalibObjects);
778
779 lockDecodeInput.reset();
780
781 uint32_t threadIndex;
782 if (mConfParam->dump) {
783 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
784 while (pipelineContext->jobThreadIndex == -1) {
785 }
786 threadIndex = pipelineContext->jobThreadIndex;
787 } else {
788 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
789 }
790
791 std::string dir = "";
792 if (mConfParam->dumpFolder != "") {
793 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
794 if (mNTFs == 1) {
795 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
796 }
797 dir += "/";
798 }
799 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
800 mGPUReco->DumpSettings(threadIndex, dir.c_str());
801 }
802 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
803 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
804 mNTFDumps++;
805 }
806 }
807 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
808 if (mConfParam->dumpBadTFMode == 2) {
809 ptrsDump.reset(new GPUTrackingInOutPointers);
810 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
811 }
812
813 int32_t retVal = 0;
814 if (mSpecConfig.enableDoublePipeline) {
815 if (!pipelineContext->jobSubmitted) {
816 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
817 } else {
818 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
819 }
820 std::unique_lock lk(pipelineContext->jobFinishedMutex);
821 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
822 retVal = pipelineContext->jobReturnValue;
823 threadIndex = pipelineContext->jobThreadIndex;
824 } else {
825 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
826 threadIndex = mNextThreadIndex;
827 if (mConfig->configProcessing.doublePipeline) {
828 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
829 }
830
831 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
832 }
833 if (retVal != 0) {
834 debugTFDump = true;
835 }
836 cleanOldCalibsTPCPtrs(oldCalibObjects);
837
838 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
839
840 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
841 mNDebugDumps++;
842 if (mConfParam->dumpBadTFMode <= 1) {
843 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
844 FILE* fp = fopen(filename.c_str(), "w+b");
845 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
846 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
847 auto data = pc.inputs().get<gsl::span<char>>(ref);
848 if (mConfParam->dumpBadTFMode == 1) {
849 uint64_t size = data.size();
850 fwrite(&size, 1, sizeof(size), fp);
851 }
852 fwrite(data.data(), 1, data.size(), fp);
853 }
854 fclose(fp);
855 } else if (mConfParam->dumpBadTFMode == 2) {
856 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
857 }
858 }
859
860 if (mConfParam->dump == 2) {
861 return;
862 }
863
864 // ------------------------------ Varios postprocessing steps ------------------------------
865
866 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
868 }
869 bool createEmptyOutput = false;
870 if (retVal != 0) {
871 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
872 if (mConfig->configProcessing.throttleAlarms) {
873 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
874 } else {
875 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
876 }
877 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
878 } else {
879 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
880 }
881 }
882
883 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
884 if (createEmptyOutput) {
885 memset(&ptrs, 0, sizeof(ptrs));
886 for (uint32_t i = 0; i < outputRegions.count(); i++) {
887 if (outputBuffers[i].first) {
888 size_t toSize = 0;
889 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
890 toSize = sizeof(*ptrs.tpcCompressedClusters);
891 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
892 toSize = sizeof(o2::tpc::ClusterCountIndex);
893 }
894 outputBuffers[i].first->get().resize(toSize);
895 outputBuffers[i].second = outputBuffers[i].first->get().data();
896 if (toSize) {
897 memset(outputBuffers[i].second, 0, toSize);
898 }
899 }
900 }
901 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
902 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
903 ptrs.clustersNative = tmpEmptyClNative.get();
904 if (mSpecConfig.processMC) {
905 MCLabelContainer cont;
906 cont.flatten_to(clustersMCBuffer.first);
907 clustersMCBuffer.second = clustersMCBuffer.first;
908 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
909 }
910 } else {
911 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
912 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
913 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
914 if (!mConfParam->allocateOutputOnTheFly) {
915 for (uint32_t i = 0; i < outputRegions.count(); i++) {
916 if (outputRegions.asArray()[i].ptrBase) {
917 if (outputRegions.asArray()[i].size == 1) {
918 throw std::runtime_error("Preallocated buffer size exceeded");
919 }
920 outputRegions.asArray()[i].checkCurrent();
921 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
922 }
923 }
924 }
925 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
926 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
927 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
928
929 // if requested, tune TPC tracks
930 if (ptrs.nOutputTracksTPCO2) {
931 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
932 }
933
934 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
935 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
936 }
937 }
938
939 if (mConfig->configWorkflow.outputs.isSet(gpudatatypes::InOutType::TPCMergedTracks)) {
940 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
941 }
942
943 if (mSpecConfig.outputCompClustersRoot) {
946 }
947
948 if (mClusterOutputIds.size() > 0) {
949 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
950 if (mSpecConfig.sendClustersPerSector) {
951 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
952 for (uint32_t i = 0; i < NSectors; i++) {
953 if (mTPCSectorMask & (1ul << i)) {
955 clusterOutputSectorHeader.sectorBits = (1ul << i);
956 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
957 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
958 memset(outIndex, 0, sizeof(*outIndex));
959 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
960 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
961 }
962 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
963 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
964 MCLabelContainer cont;
965 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
966 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
967 for (const auto& label : labels) {
968 cont.addElement(j, label);
969 }
970 }
971 ConstMCLabelContainer contflat;
972 cont.flatten_to(contflat);
973 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
974 }
975 }
976 }
977 } else {
978 // Clusters are shipped as single message, fill ClusterCountIndex
979 DataHeader::SubSpecificationType subspec = NSectors;
980 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
981 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
982 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
983 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
984 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
985 }
986 }
987 }
988 if (mSpecConfig.outputQA) {
989 TObjArray out;
990 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
991 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
992 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
993 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
994 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
995 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
996 if (sendQAOutput) {
997 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
998 }
999 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1000 if (sendQAOutput) {
1001 mQA->cleanup();
1002 }
1003 }
1004 if (mSpecConfig.outputErrorQA) {
1005 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1006 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1007 }
1008 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1010 }
1011 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1013 }
1014 mTimer->Stop();
1015 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1016}
1017
1018void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1019{
1020 GPUCalibObjectsConst newCalibObjects;
1021 GPUNewCalibValues newCalibValues;
1022 // check for updates of TPC calibration objects
1023 bool needCalibUpdate = false;
1024 if (mGRPGeomUpdated) {
1025 mGRPGeomUpdated = false;
1026 needCalibUpdate = true;
1027
1028 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1031 mITSGeometryCreated = true;
1032 }
1033
1034 if (mAutoSolenoidBz) {
1035 newCalibValues.newSolenoidField = true;
1036 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1037 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1038 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1039 }
1040 if (mAutoContinuousMaxTimeBin) {
1041 newCalibValues.newContinuousMaxTimeBin = true;
1042 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1043 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1044 }
1045
1046 if (!mPropagatorInstanceCreated) {
1047 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1048 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1049 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1050 }
1051 mPropagatorInstanceCreated = true;
1052 }
1053
1054 if (!mMatLUTCreated) {
1055 if (mConfParam->matLUTFile.size() == 0) {
1056 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1057 LOG(info) << "Loaded material budget lookup table";
1058 }
1059 mMatLUTCreated = true;
1060 }
1061 if (mSpecConfig.readTRDtracklets && !mTRDGeometryCreated) {
1062 auto gm = o2::trd::Geometry::instance();
1063 gm->createPadPlaneArray();
1064 gm->createClusterMatrixArray();
1065 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1066 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1067 LOG(info) << "Loaded TRD geometry";
1068 mTRDGeometryCreated = true;
1069 }
1070 }
1071 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1072 if (mSpecConfig.runITSTracking) {
1073 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1074 }
1075 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1076 newCalibValues.newTPCTimeBinCut = true;
1077 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1078 needCalibUpdate = true;
1079 }
1080 if (mSpecConfig.nnLoadFromCCDB) {
1081 auto dumpToFile = [](const char* buffer, std::size_t validSize, const std::string& path) {
1082 std::ofstream out(path, std::ios::binary | std::ios::trunc);
1083 if (!out.is_open()) {
1084 throw std::runtime_error("Failed to open output file: " + path);
1085 }
1086
1087 out.write(buffer, static_cast<std::streamsize>(validSize));
1088 if (!out) {
1089 throw std::runtime_error("Failed while writing data to: " + path);
1090 }
1091 };
1092 for (int i = 0; i < 3; i++) {
1093 newCalibObjects.nnClusterizerNetworks[i] = mConfig->configCalib.nnClusterizerNetworks[i];
1094 if (mSpecConfig.nnDumpToFile && newCalibObjects.nnClusterizerNetworks[i]) {
1095 std::string path = "tpc_nn_clusterizer_" + std::to_string(i) + ".onnx";
1096 dumpToFile(newCalibObjects.nnClusterizerNetworks[i]->getONNXModel(), newCalibObjects.nnClusterizerNetworks[i]->getONNXModelSize(), path);
1097 LOG(info) << "Dumped TPC clusterizer NN " << i << " to file " << path;
1098 }
1099 }
1100 }
1101 if (needCalibUpdate) {
1102 LOG(info) << "Updating GPUReconstruction calibration objects";
1103 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1104 }
1105}
1106
1108{
1109 Options opts;
1110 if (mSpecConfig.enableDoublePipeline) {
1111 bool send = mSpecConfig.enableDoublePipeline == 2;
1112 char* o2jobid = getenv("O2JOBID");
1113 char* numaid = getenv("NUMAID");
1114 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1115 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1116 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1117 }
1118 if (mSpecConfig.enableDoublePipeline == 2) {
1119 return opts;
1120 }
1121 if (mSpecConfig.outputTracks) {
1123 }
1124 return opts;
1125}
1126
1128{
1129 Inputs inputs;
1130 if (mSpecConfig.zsDecoder) {
1131 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1132 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1133 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1134 if (mSpecConfig.askDISTSTF) {
1135 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1136 }
1137 }
1138 if (mSpecConfig.enableDoublePipeline == 2) {
1139 if (!mSpecConfig.zsDecoder) {
1140 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1141 }
1142 return inputs;
1143 } else if (mSpecConfig.enableDoublePipeline == 1) {
1144 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1145 }
1146 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1147 // calibration objects for TPC clusterization
1148 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1149 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1150 }
1151 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1152 // calibration objects for TPC tracking
1153 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1154 if (mapSources != 0) {
1155 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1157 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1158 }
1160 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1161 }
1162 }
1163
1164 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1165 if (mSpecConfig.tpcUseMCTimeGain) {
1166 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1167 } else {
1168 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1169 }
1170 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1171 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1173 Options optsDummy;
1174 o2::tpc::CorrectionMapsLoaderGloOpts gloOpts{mSpecConfig.lumiScaleType, mSpecConfig.lumiScaleMode, mSpecConfig.enableMShape, mSpecConfig.enableCTPLumi};
1175 mCalibObjects.mFastTransformHelper->requestCCDBInputs(inputs, optsDummy, gloOpts); // option filled here is lost
1176 }
1177 if (mSpecConfig.decompressTPC) {
1178 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1179 } else if (mSpecConfig.caClusterer) {
1180 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1181 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1182 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1183 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1184 }
1185 } else if (mSpecConfig.runTPCTracking) {
1186 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1187 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1188 }
1189 if (mSpecConfig.processMC) {
1190 if (mSpecConfig.caClusterer) {
1191 if (!mSpecConfig.zsDecoder) {
1192 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1193 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1194 }
1195 } else if (mSpecConfig.runTPCTracking) {
1196 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1197 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1198 }
1199 }
1200
1201 if (mSpecConfig.zsOnTheFly) {
1202 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1203 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1204 }
1205 if (mSpecConfig.readTRDtracklets) {
1206 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1207 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1208 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1209 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1210 }
1211
1212 if (mSpecConfig.runITSTracking) {
1213 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", 0, Lifetime::Timeframe);
1214 inputs.emplace_back("patterns", "ITS", "PATTERNS", 0, Lifetime::Timeframe);
1215 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", 0, Lifetime::Timeframe);
1216 if (mSpecConfig.itsTriggerType == 1) {
1217 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1218 } else if (mSpecConfig.itsTriggerType == 2) {
1219 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1220 }
1221 if (mSpecConfig.enableDoublePipeline != 2) {
1222 if (mSpecConfig.isITS3) {
1223 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1224 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1225 } else {
1226 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1227 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1228 }
1229 if (mSpecConfig.itsOverrBeamEst) {
1230 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1231 }
1232 }
1233 if (mSpecConfig.processMC) {
1234 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", 0, Lifetime::Timeframe);
1235 inputs.emplace_back("ITSMC2ROframes", "ITS", "CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1236 }
1237 }
1238
1239 // NN clusterizer
1240 *mConfParam = mConfig->ReadConfigurableParam();
1241 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1242
1243 LOG(info) << "(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1244 mSpecConfig.nnLoadFromCCDB = true;
1245 mSpecConfig.nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1246 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1247
1248 std::map<std::string, std::string> metadata;
1249 metadata["inputDType"] = nnClusterizerSettings.nnInferenceInputDType; // FP16 or FP32
1250 metadata["outputDType"] = nnClusterizerSettings.nnInferenceOutputDType; // FP16 or FP32
1251 metadata["nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum; // 0, 1 -> Only for regression model
1252 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType; // FC, CNN
1253 metadata["nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate; // in kHz
1254 metadata["nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType; // pp, pPb, PbPb
1255
1256 auto convert_map_to_metadata = [](const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1257 for (const auto& [key, value] : inputMap) {
1258 if (value != "") {
1259 outputMetadata.push_back({key, value});
1260 }
1261 }
1262 };
1263
1264 mSpecConfig.nnEvalMode = o2::utils::Str::tokenize(nnClusterizerSettings.nnEvalMode, ':');
1265 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1266
1267 if (mConfParam->printSettings) {
1268 auto printSettings = [](const std::map<std::string, std::string>& settings) {
1269 LOG(info) << "(NN CLUS) NN Clusterizer CCDB settings:";
1270 for (const auto& [key, value] : settings) {
1271 LOG(info) << " " << key << " : " << value;
1272 }
1273 };
1274 printSettings(metadata);
1275 }
1276
1277 if (mSpecConfig.nnEvalMode[0] == "c1") {
1278 metadata["nnCCDBEvalType"] = "classification_c1";
1279 convert_map_to_metadata(metadata, ccdb_metadata);
1280 inputs.emplace_back("nn_classification_c1", gDataOriginTPC, "NNCLUSTERIZER_C1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1281 } else if (mSpecConfig.nnEvalMode[0] == "c2") {
1282 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1283 metadata["nnCCDBEvalType"] = "classification_c2";
1284 convert_map_to_metadata(metadata, ccdb_metadata);
1285 inputs.emplace_back("nn_classification_c2", gDataOriginTPC, "NNCLUSTERIZER_C2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1286 }
1287
1288 metadata["nnCCDBEvalType"] = "regression_c1";
1289 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1290 convert_map_to_metadata(metadata, ccdb_metadata);
1291 inputs.emplace_back("nn_regression_c1", gDataOriginTPC, "NNCLUSTERIZER_R1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1292
1293 if (mSpecConfig.nnEvalMode[1] == "r2") {
1294 metadata["nnCCDBEvalType"] = "regression_c2";
1295 convert_map_to_metadata(metadata, ccdb_metadata);
1296 inputs.emplace_back("nn_regression_c2", gDataOriginTPC, "NNCLUSTERIZER_R2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1297 }
1298 }
1299
1300 return inputs;
1301};
1302
1304{
1305 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1306 std::vector<OutputSpec> outputSpecs;
1307 if (mSpecConfig.enableDoublePipeline == 2) {
1308 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1309 return outputSpecs;
1310 }
1311 if (mSpecConfig.outputTracks) {
1312 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1313 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1314 }
1315 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1316 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1317 }
1318 if (mSpecConfig.outputCompClustersRoot) {
1319 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1320 }
1321 if (mSpecConfig.outputCompClustersFlat) {
1322 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1323 }
1324 if (mSpecConfig.outputCAClusters) {
1325 for (auto const& sector : mTPCSectors) {
1326 mClusterOutputIds.emplace_back(sector);
1327 }
1328 if (mSpecConfig.sendClustersPerSector) {
1329 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1330 for (const auto sector : mTPCSectors) {
1331 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1332 }
1333 } else {
1334 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1335 }
1336 if (mSpecConfig.processMC) {
1337 if (mSpecConfig.sendClustersPerSector) {
1338 for (const auto sector : mTPCSectors) {
1339 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1340 }
1341 } else {
1342 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1343 }
1344 }
1345 }
1346 if (mSpecConfig.outputSharedClusterMap) {
1347 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1348 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1349 }
1350 if (mSpecConfig.tpcTriggerHandling) {
1351 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1352 }
1353 if (mSpecConfig.outputQA) {
1354 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1355 }
1356 if (mSpecConfig.outputErrorQA) {
1357 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1358 }
1359
1360 if (mSpecConfig.runITSTracking) {
1361 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1362 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1363 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1364 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1365 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1367
1368 if (mSpecConfig.processMC) {
1369 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1371 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1372 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1373 }
1374 }
1375
1376 return outputSpecs;
1377};
1378
1380{
1381 ExitPipeline();
1382 mQA.reset(nullptr);
1383 mDisplayFrontend.reset(nullptr);
1384 mGPUReco.reset(nullptr);
1385}
1386
1387} // namespace o2::gpu
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Definition of TPCFastTransform class.
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
StringRef key
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:178
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:593
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:621
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"