Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
42#include "TPCFastTransform.h"
51#include "TPCBase/RDHUtils.h"
53#include "GPUO2InterfaceQA.h"
54#include "GPUO2Interface.h"
55#include "GPUO2InterfaceUtils.h"
56#include "CalibdEdxContainer.h"
57#include "ORTRootSerializer.h"
58#include "GPUNewCalibValues.h"
59#include "TPCPadGainCalib.h"
60#include "TPCZSLinkMapping.h"
62#include "TPCBase/Sector.h"
63#include "TPCBase/Utils.h"
71#include "Algorithm/Parser.h"
74#include "TRDBase/Geometry.h"
81#include "GPUWorkflowInternal.h"
82// #include "Framework/ThreadPool.h"
83
84#include <TStopwatch.h>
85#include <TObjArray.h>
86#include <TH1F.h>
87#include <TH2F.h>
88#include <TH1D.h>
89#include <TGraphAsymmErrors.h>
90
91#include <filesystem>
92#include <memory>
93#include <vector>
94#include <iomanip>
95#include <stdexcept>
96#include <regex>
97#include <sys/types.h>
98#include <sys/stat.h>
99#include <fcntl.h>
100#include <chrono>
101#include <unordered_set>
102
103using namespace o2::framework;
104using namespace o2::header;
105using namespace o2::gpu;
106using namespace o2::base;
107using namespace o2::dataformats;
109
110namespace o2::gpu
111{
112
113GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
114{
115 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
116 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
117 }
118
119 mConfig.reset(new GPUO2InterfaceConfiguration);
120 mConfParam.reset(new GPUSettingsO2);
121 mTFSettings.reset(new GPUSettingsTF);
122 mTimer.reset(new TStopwatch);
123 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
124
125 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
126 *gPolicyOrder = &mPolicyOrder;
127 }
128}
129
131
133{
135 GPUO2InterfaceConfiguration& config = *mConfig.get();
136
137 // Create configuration object and fill settings
138 mConfig->configGRP.solenoidBzNominalGPU = 0;
139 mTFSettings->hasSimStartOrbit = 1;
140 auto& hbfu = o2::raw::HBFUtils::Instance();
141 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
142
143 *mConfParam = mConfig->ReadConfigurableParam();
144
145 if (mConfParam->display) {
146 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
147 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
148 if (mConfig->configProcessing.eventDisplay != nullptr) {
149 LOG(info) << "Event display enabled";
150 } else {
151 throw std::runtime_error("GPU Event Display frontend could not be created!");
152 }
153 }
154 if (mSpecConfig.enableDoublePipeline) {
155 mConfig->configProcessing.doublePipeline = 1;
156 }
157
158 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
159 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
160 if (mAutoContinuousMaxTimeBin) {
161 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
162 }
163 if (mConfig->configProcessing.deviceNum == -2) {
164 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
165 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
166 mConfig->configProcessing.deviceNum = myId;
167 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
168 }
169 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
170 mVerbosity = 1;
171 }
172 mConfig->configProcessing.runMC = mSpecConfig.processMC;
173 if (mSpecConfig.outputQA) {
174 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
175 throw std::runtime_error("Need MC information to create QA plots");
176 }
177 if (!mSpecConfig.processMC) {
178 mConfig->configQA.noMC = true;
179 }
180 mConfig->configQA.shipToQC = true;
181 if (!mConfig->configProcessing.runQA) {
182 mConfig->configQA.enableLocalOutput = false;
183 mQATaskMask = (mSpecConfig.processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
184 mConfig->configProcessing.runQA = -mQATaskMask;
185 }
186 }
187 mConfig->configInterface.outputToExternalBuffers = true;
188 if (mConfParam->synchronousProcessing) {
189 mConfig->configReconstruction.useMatLUT = false;
190 }
191 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
192 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
193 }
194
195 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
196 if (mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
197 mConfig->configWorkflow.steps.set(GPUDataTypes::RecoStep::TPCConversion,
200 mConfig->configWorkflow.outputs.set(GPUDataTypes::InOutType::TPCMergedTracks);
201 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
202 }
203 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
204 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCCompression, true);
205 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCCompressedClusters, true);
206 }
207 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCClusters);
208 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
209 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCRaw);
210 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCClusterFinding, true);
211 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCClusters, true);
212 }
213 if (mSpecConfig.decompressTPC) {
214 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCCompression, false);
215 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TPCDecompression, true);
216 mConfig->configWorkflow.inputs.set(GPUDataTypes::InOutType::TPCCompressedClusters);
217 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCClusters, true);
218 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::TPCCompressedClusters, false);
219 if (mTPCSectorMask != 0xFFFFFFFFF) {
220 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
221 }
222 }
223 if (mSpecConfig.runTRDTracking) {
224 mConfig->configWorkflow.inputs.setBits(GPUDataTypes::InOutType::TRDTracklets, true);
225 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::TRDTracking, true);
226 }
227 if (mSpecConfig.runITSTracking) {
228 mConfig->configWorkflow.inputs.setBits(GPUDataTypes::InOutType::ITSClusters, true);
229 mConfig->configWorkflow.outputs.setBits(GPUDataTypes::InOutType::ITSTracks, true);
230 mConfig->configWorkflow.steps.setBits(GPUDataTypes::RecoStep::ITSTracking, true);
231 }
232 if (mSpecConfig.outputSharedClusterMap) {
233 mConfig->configProcessing.outputSharedClusterMap = true;
234 }
235 if (!mSpecConfig.outputTracks) {
236 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
237 }
238 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
239
240 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
241 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
242 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
243 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
244 }
245 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
246 throw std::runtime_error("double pipeline requires exactly 2 threads");
247 } */
248 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
249 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
250 }
251
252 if (mSpecConfig.enableDoublePipeline != 2) {
253 mGPUReco = std::make_unique<GPUO2Interface>();
254
255 // initialize TPC calib objects
256 initFunctionTPCCalib(ic);
257
258 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
259 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
260 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
261 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
262 if (mConfig->configCalib.fastTransform == nullptr) {
263 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
264 }
265
266 if (mConfParam->matLUTFile.size()) {
267 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
268 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
269 if (mConfig->configCalib.matLUT == nullptr) {
270 LOGF(fatal, "Error loading matlut file");
271 }
272 } else {
273 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
274 }
275
276 if (mSpecConfig.readTRDtracklets) {
277 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
278 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
279 }
280
281 mConfig->configProcessing.willProvideO2PropagatorLate = true;
282 mConfig->configProcessing.o2PropagatorUseGPUField = true;
283
284 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
285 mConfig->configProcessing.printSettings = true;
286 if (mConfParam->printSettings > 1) {
287 mConfig->PrintParam();
288 }
289 }
290
291 // Configuration is prepared, initialize the tracker.
292 if (mGPUReco->Initialize(config) != 0) {
293 throw std::invalid_argument("GPU Reconstruction initialization failed");
294 }
295 if (mSpecConfig.outputQA) {
296 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
297 }
298 if (mSpecConfig.outputErrorQA) {
299 mGPUReco->setErrorCodeOutput(&mErrorQA);
300 }
301
302 // initialize ITS
303 if (mSpecConfig.runITSTracking) {
304 initFunctionITS(ic);
305 }
306 }
307
308 if (mSpecConfig.enableDoublePipeline) {
309 initPipeline(ic);
310 if (mConfParam->dump >= 2) {
311 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
312 }
313 }
314
315 auto& callbacks = ic.services().get<CallbackService>();
316 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
317 if (info.size == 0) {
318 return;
319 }
320 if (mSpecConfig.enableDoublePipeline) {
321 mRegionInfos.emplace_back(info);
322 }
323 if (mSpecConfig.enableDoublePipeline == 2) {
324 return;
325 }
326 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
327 return;
328 }
329 int32_t fd = 0;
330 if (mConfParam->mutexMemReg) {
331 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
332 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
333 if (fd == -1) {
334 throw std::runtime_error("Error opening memlock mutex lock file");
335 }
336 fchmod(fd, mask);
337 if (lockf(fd, F_LOCK, 0)) {
338 throw std::runtime_error("Error locking memlock mutex file");
339 }
340 }
341 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
342 if (mConfParam->benchmarkMemoryRegistration) {
343 start = std::chrono::high_resolution_clock::now();
344 }
345 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
346 throw std::runtime_error("Error registering memory for GPU");
347 }
348 if (mConfParam->benchmarkMemoryRegistration) {
349 end = std::chrono::high_resolution_clock::now();
350 std::chrono::duration<double> elapsed_seconds = end - start;
351 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
352 }
353 if (mConfParam->mutexMemReg) {
354 if (lockf(fd, F_ULOCK, 0)) {
355 throw std::runtime_error("Error unlocking memlock mutex file");
356 }
357 close(fd);
358 }
359 });
360
361 mTimer->Stop();
362 mTimer->Reset();
363}
364
366{
367 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
368 handlePipelineStop();
369}
370
372{
373 handlePipelineEndOfStream(ec);
374}
375
377{
378 if (mSpecConfig.enableDoublePipeline != 2) {
379 finaliseCCDBTPC(matcher, obj);
380 if (mSpecConfig.runITSTracking) {
381 finaliseCCDBITS(matcher, obj);
382 }
383 }
384 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
385 mGRPGeomUpdated = true;
386 return;
387 }
388}
389
390template <class D, class E, class F, class G, class H, class I, class J, class K>
391void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
392{
393 if (mSpecConfig.enableDoublePipeline == 1) {
394 return;
395 }
396 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
397 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
398
399 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
400 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
401 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
402 tpcZSmeta.Pointers[i][j].clear();
403 tpcZSmeta.Sizes[i][j].clear();
404 }
405 }
406 }
407 if (mSpecConfig.zsOnTheFly) {
408 tpcZSonTheFlySizes = {0};
409 // tpcZSonTheFlySizes: #zs pages per endpoint:
410 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
411 bool recv = false, recvsizes = false;
412 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
413 if (recvsizes) {
414 throw std::runtime_error("Received multiple ZSSIZES data");
415 }
416 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
417 recvsizes = true;
418 }
419 // zs pages
420 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
421 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
422 if (recv) {
423 throw std::runtime_error("Received multiple TPCZS data");
424 }
425 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
426 recv = true;
427 }
428 if (!recv || !recvsizes) {
429 throw std::runtime_error("TPC ZS on the fly data not received");
430 }
431
432 uint32_t offset = 0;
433 for (uint32_t i = 0; i < NSectors; i++) {
434 uint32_t pageSector = 0;
435 for (uint32_t j = 0; j < NEndpoints; j++) {
436 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
437 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
438 }
439 if (mVerbosity >= 1) {
440 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
441 }
442 }
443 }
444 if (mSpecConfig.zsDecoder) {
445 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
446 auto isSameRdh = [](const char* left, const char* right) -> bool {
447 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
448 };
449 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
450 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
451 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
452 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
453 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
454 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
455 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
456 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
457 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
458 };
459 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
460 if (checkForZSData(ptr, subSpec)) {
461 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
462 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
463 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
464 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
465 }
466 };
467 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
468 debugTFDump = true;
469 static uint32_t nErrors = 0;
470 nErrors++;
471 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
472 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
473 }
474 }
475
476 int32_t totalCount = 0;
477 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
478 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
479 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
480 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
481 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
482 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
483 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
484 totalCount += tpcZSmeta.Pointers[i][j].size();
485 }
486 }
487 } else if (mSpecConfig.decompressTPC) {
488 if (mSpecConfig.decompressTPCFromROOT) {
489 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
490 compClustersFlatDummy.setForward(&compClustersDummy);
491 pCompClustersFlat = &compClustersFlatDummy;
492 } else {
493 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
494 }
495 if (pCompClustersFlat == nullptr) {
496 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
497 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
498 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
499 }
500 } else if (!mSpecConfig.zsOnTheFly) {
501 if (mVerbosity) {
502 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
503 }
504 }
505}
506
507int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
508{
509 int32_t retVal = 0;
510 if (mConfParam->dump < 2) {
511 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
512
513 if (retVal == 0 && mSpecConfig.runITSTracking) {
514 retVal = runITSTracking(*pc);
515 }
516 }
517
518 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
519 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
520 }
521 return retVal;
522}
523
524void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
525{
526 if (mOldCalibObjects.size() > 0) {
527 mOldCalibObjects.pop();
528 }
529 mOldCalibObjects.emplace(std::move(oldCalibObjects));
530}
531
533{
534 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
535 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
536
537 auto cput = mTimer->CpuTime();
538 auto realt = mTimer->RealTime();
539 mTimer->Start(false);
540 mNTFs++;
541
542 std::vector<gsl::span<const char>> inputs;
543
544 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
545 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
546 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
547 o2::tpc::CompressedClusters compClustersDummy;
550 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
551 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
552 std::unique_ptr<char[]> tmpEmptyCompClusters;
553
554 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
555 bool debugTFDump = false;
556
557 if (mSpecConfig.processMC) {
558 getWorkflowTPCInput_mc = true;
559 }
560 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
561 getWorkflowTPCInput_clusters = true;
562 }
563 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
564 getWorkflowTPCInput_digits = true;
565 }
566
567 // ------------------------------ Handle inputs ------------------------------
568
569 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
570
572 if (mSpecConfig.enableDoublePipeline != 2) {
573 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
574 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
575 }
576 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
577 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
578 }
579 }
580
582 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
583 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
584
585 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
586 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
587 mTFSettings->hasTfStartOrbit = 1;
588 mTFSettings->hasNHBFPerTF = 1;
589 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
590 mTFSettings->hasRunStartOrbit = 0;
591 ptrs.settingsTF = mTFSettings.get();
592
593 if (mSpecConfig.enableDoublePipeline != 2) {
594 if (mVerbosity) {
595 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
596 }
597 if (mConfParam->checkFirstTfOrbit) {
598 static uint32_t lastFirstTFOrbit = -1;
599 static uint32_t lastTFCounter = -1;
600 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
601 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
602 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
603 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
604 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
605 }
606 }
607 lastFirstTFOrbit = tinfo.firstTForbit;
608 lastTFCounter = tinfo.tfCounter;
609 }
610 }
611
613 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
614 if (mSpecConfig.readTRDtracklets) {
615 o2::globaltracking::DataRequest dataRequestTRD;
617 inputTracksTRD.collectData(pc, dataRequestTRD);
618 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
619 }
620
621 void* ptrEp[NSectors * NEndpoints] = {};
622 bool doInputDigits = false, doInputDigitsMC = false;
623 if (mSpecConfig.decompressTPC) {
624 ptrs.tpcCompressedClusters = pCompClustersFlat;
625 } else if (mSpecConfig.zsOnTheFly) {
626 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
627 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
628 ptrs.tpcZS = &tpcZS;
629 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
630 } else if (mSpecConfig.zsDecoder) {
631 ptrs.tpcZS = &tpcZS;
632 if (mSpecConfig.processMC) {
633 throw std::runtime_error("Cannot process MC information, none available");
634 }
635 } else if (mSpecConfig.caClusterer) {
636 doInputDigits = true;
637 doInputDigitsMC = mSpecConfig.processMC;
638 } else {
639 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
640 }
641
642 if (mTPCSectorMask != 0xFFFFFFFFF) {
643 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
644 for (uint32_t i = 0; i < NSectors; i++) {
645 if (!(mTPCSectorMask & (1ul << i))) {
646 if (ptrs.tpcZS) {
647 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
648 tpcZS.sector[i].zsPtr[j] = nullptr;
649 tpcZS.sector[i].nZSPtr[j] = nullptr;
650 tpcZS.sector[i].count[j] = 0;
651 }
652 }
653 }
654 }
655 }
656
657 GPUTrackingInOutDigits tpcDigitsMap;
658 GPUTPCDigitsMCInput tpcDigitsMapMC;
659 if (doInputDigits) {
660 ptrs.tpcPackedDigits = &tpcDigitsMap;
661 if (doInputDigitsMC) {
662 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
663 }
664 for (uint32_t i = 0; i < NSectors; i++) {
665 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
666 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
667 if (doInputDigitsMC) {
668 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
669 }
670 }
671 }
672
673 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
674 if (mClusterOutputIds.size() > 0) {
675 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
676 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
677 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
678 }
679
680 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
681
682 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
683 if (mSpecConfig.enableDoublePipeline) {
684 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
685 return;
686 }
687 }
688
689 // ------------------------------ Prepare outputs ------------------------------
690
691 GPUInterfaceOutputs outputRegions;
692 using outputDataType = char;
693 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
694 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
695 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
696 std::unordered_set<std::string> outputsCreated;
697
698 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
699 if (condition) {
700 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
701 if (mConfParam->allocateOutputOnTheFly) {
702 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
703 size += offset;
704 if (mVerbosity) {
705 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
706 }
707 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
708 if (mVerbosity) {
709 start = std::chrono::high_resolution_clock::now();
710 }
711 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
712 outputsCreated.insert(name);
713 if (mVerbosity) {
714 end = std::chrono::high_resolution_clock::now();
715 std::chrono::duration<double> elapsed_seconds = end - start;
716 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
717 << ": " << elapsed_seconds.count() << "s";
718 }
719 return (buffer.second = buffer.first->get().data()) + offset;
720 };
721 } else {
722 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
723 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
724 region.size = buffer.first->get().size() - offset;
725 outputsCreated.insert(name);
726 }
727 }
728 };
729
730 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
731 if (!buffer.first) {
732 return;
733 }
734 if (buffer.first->get().size() < size) {
735 throw std::runtime_error("Invalid buffer size requested");
736 }
737 buffer.first->get().resize(size);
738 if (size && buffer.first->get().data() != buffer.second) {
739 throw std::runtime_error("Inconsistent buffer address after downsize");
740 }
741 };
742
743 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
744 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
745 downSizeBuffer(buffer, size);
746 };*/
747
748 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
749 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
750 if (!buffer.first) {
751 return;
752 }
753 if (span.size() && buffer.second != (char*)span.data()) {
754 throw std::runtime_error("Buffer does not match span");
755 }
756 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
757 };
758
759 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
760 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
761 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
762 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
763 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
764 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
765 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
766 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
768 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
769 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
770 }
771
772 // ------------------------------ Actual processing ------------------------------
773
774 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
775 throw std::runtime_error("Invalid input for gpu tracking");
776 }
777
778 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
779
780 calibObjectStruct oldCalibObjects;
781 doCalibUpdates(pc, oldCalibObjects);
782
783 lockDecodeInput.reset();
784
785 uint32_t threadIndex;
786 if (mConfParam->dump) {
787 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
788 while (pipelineContext->jobThreadIndex == -1) {
789 }
790 threadIndex = pipelineContext->jobThreadIndex;
791 } else {
792 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
793 }
794
795 std::string dir = "";
796 if (mConfParam->dumpFolder != "") {
797 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
798 if (mNTFs == 1) {
799 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
800 }
801 dir += "/";
802 }
803 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
804 mGPUReco->DumpSettings(threadIndex, dir.c_str());
805 }
806 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
807 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
808 mNTFDumps++;
809 }
810 }
811 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
812 if (mConfParam->dumpBadTFMode == 2) {
813 ptrsDump.reset(new GPUTrackingInOutPointers);
814 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
815 }
816
817 int32_t retVal = 0;
818 if (mSpecConfig.enableDoublePipeline) {
819 if (!pipelineContext->jobSubmitted) {
820 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
821 } else {
822 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
823 }
824 std::unique_lock lk(pipelineContext->jobFinishedMutex);
825 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
826 retVal = pipelineContext->jobReturnValue;
827 threadIndex = pipelineContext->jobThreadIndex;
828 } else {
829 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
830 threadIndex = mNextThreadIndex;
831 if (mConfig->configProcessing.doublePipeline) {
832 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
833 }
834
835 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
836 }
837 if (retVal != 0) {
838 debugTFDump = true;
839 }
840 cleanOldCalibsTPCPtrs(oldCalibObjects);
841
842 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
843
844 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
845 mNDebugDumps++;
846 if (mConfParam->dumpBadTFMode <= 1) {
847 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
848 FILE* fp = fopen(filename.c_str(), "w+b");
849 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
850 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
851 auto data = pc.inputs().get<gsl::span<char>>(ref);
852 if (mConfParam->dumpBadTFMode == 1) {
853 uint64_t size = data.size();
854 fwrite(&size, 1, sizeof(size), fp);
855 }
856 fwrite(data.data(), 1, data.size(), fp);
857 }
858 fclose(fp);
859 } else if (mConfParam->dumpBadTFMode == 2) {
860 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
861 }
862 }
863
864 if (mConfParam->dump == 2) {
865 return;
866 }
867
868 // ------------------------------ Varios postprocessing steps ------------------------------
869
870 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
872 }
873 bool createEmptyOutput = false;
874 if (retVal != 0) {
875 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
876 if (mConfig->configProcessing.throttleAlarms) {
877 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
878 } else {
879 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
880 }
881 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
882 } else {
883 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
884 }
885 }
886
887 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
888 if (createEmptyOutput) {
889 memset(&ptrs, 0, sizeof(ptrs));
890 for (uint32_t i = 0; i < outputRegions.count(); i++) {
891 if (outputBuffers[i].first) {
892 size_t toSize = 0;
893 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
894 toSize = sizeof(*ptrs.tpcCompressedClusters);
895 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
896 toSize = sizeof(o2::tpc::ClusterCountIndex);
897 }
898 outputBuffers[i].first->get().resize(toSize);
899 outputBuffers[i].second = outputBuffers[i].first->get().data();
900 if (toSize) {
901 memset(outputBuffers[i].second, 0, toSize);
902 }
903 }
904 }
905 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
906 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
907 ptrs.clustersNative = tmpEmptyClNative.get();
908 if (mSpecConfig.processMC) {
909 MCLabelContainer cont;
910 cont.flatten_to(clustersMCBuffer.first);
911 clustersMCBuffer.second = clustersMCBuffer.first;
912 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
913 }
914 } else {
915 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
916 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
917 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
918 if (!mConfParam->allocateOutputOnTheFly) {
919 for (uint32_t i = 0; i < outputRegions.count(); i++) {
920 if (outputRegions.asArray()[i].ptrBase) {
921 if (outputRegions.asArray()[i].size == 1) {
922 throw std::runtime_error("Preallocated buffer size exceeded");
923 }
924 outputRegions.asArray()[i].checkCurrent();
925 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
926 }
927 }
928 }
929 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
930 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
931 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
932
933 // if requested, tune TPC tracks
934 if (ptrs.nOutputTracksTPCO2) {
935 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
936 }
937
938 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
939 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
940 }
941 }
942
943 if (mConfig->configWorkflow.outputs.isSet(GPUDataTypes::InOutType::TPCMergedTracks)) {
944 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
945 }
946
947 if (mSpecConfig.outputCompClustersRoot) {
950 }
951
952 if (mClusterOutputIds.size() > 0) {
953 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
954 if (mSpecConfig.sendClustersPerSector) {
955 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
956 for (uint32_t i = 0; i < NSectors; i++) {
957 if (mTPCSectorMask & (1ul << i)) {
959 clusterOutputSectorHeader.sectorBits = (1ul << i);
960 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
961 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
962 memset(outIndex, 0, sizeof(*outIndex));
963 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
964 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
965 }
966 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
967 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
968 MCLabelContainer cont;
969 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
970 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
971 for (const auto& label : labels) {
972 cont.addElement(j, label);
973 }
974 }
975 ConstMCLabelContainer contflat;
976 cont.flatten_to(contflat);
977 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
978 }
979 }
980 }
981 } else {
982 // Clusters are shipped as single message, fill ClusterCountIndex
983 DataHeader::SubSpecificationType subspec = NSectors;
984 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
985 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
986 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
987 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
988 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
989 }
990 }
991 }
992 if (mSpecConfig.outputQA) {
993 TObjArray out;
994 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
995 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
996 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
997 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
998 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
999 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1000 if (sendQAOutput) {
1001 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1002 }
1003 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1004 if (sendQAOutput) {
1005 mQA->cleanup();
1006 }
1007 }
1008 if (mSpecConfig.outputErrorQA) {
1009 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1010 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1011 }
1012 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1014 }
1015 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1017 }
1018 mTimer->Stop();
1019 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1020}
1021
1022void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1023{
1024 GPUCalibObjectsConst newCalibObjects;
1025 GPUNewCalibValues newCalibValues;
1026 // check for updates of TPC calibration objects
1027 bool needCalibUpdate = false;
1028 if (mGRPGeomUpdated) {
1029 mGRPGeomUpdated = false;
1030 needCalibUpdate = true;
1031
1032 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1035 mITSGeometryCreated = true;
1036 }
1037
1038 if (mAutoSolenoidBz) {
1039 newCalibValues.newSolenoidField = true;
1040 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1041 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1042 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1043 }
1044 if (mAutoContinuousMaxTimeBin) {
1045 newCalibValues.newContinuousMaxTimeBin = true;
1046 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1047 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1048 }
1049
1050 if (!mPropagatorInstanceCreated) {
1051 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1052 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1053 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1054 }
1055 mPropagatorInstanceCreated = true;
1056 }
1057
1058 if (!mMatLUTCreated) {
1059 if (mConfParam->matLUTFile.size() == 0) {
1060 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1061 LOG(info) << "Loaded material budget lookup table";
1062 }
1063 mMatLUTCreated = true;
1064 }
1065 if (mSpecConfig.readTRDtracklets && !mTRDGeometryCreated) {
1066 auto gm = o2::trd::Geometry::instance();
1067 gm->createPadPlaneArray();
1068 gm->createClusterMatrixArray();
1069 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1070 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1071 LOG(info) << "Loaded TRD geometry";
1072 mTRDGeometryCreated = true;
1073 }
1074 }
1075 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1076 if (mSpecConfig.runITSTracking) {
1077 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1078 }
1079 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1080 newCalibValues.newTPCTimeBinCut = true;
1081 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1082 needCalibUpdate = true;
1083 }
1084 if (mSpecConfig.nnLoadFromCCDB) {
1085 auto dumpToFile = [](const char* buffer, std::size_t validSize, const std::string& path) {
1086 std::ofstream out(path, std::ios::binary | std::ios::trunc);
1087 if (!out.is_open()) {
1088 throw std::runtime_error("Failed to open output file: " + path);
1089 }
1090
1091 out.write(buffer, static_cast<std::streamsize>(validSize));
1092 if (!out) {
1093 throw std::runtime_error("Failed while writing data to: " + path);
1094 }
1095 };
1096 for (int i = 0; i < 3; i++) {
1097 newCalibObjects.nnClusterizerNetworks[i] = mConfig->configCalib.nnClusterizerNetworks[i];
1098 if (mSpecConfig.nnDumpToFile && newCalibObjects.nnClusterizerNetworks[i]) {
1099 std::string path = "tpc_nn_clusterizer_" + std::to_string(i) + ".onnx";
1100 dumpToFile(newCalibObjects.nnClusterizerNetworks[i]->getONNXModel(), newCalibObjects.nnClusterizerNetworks[i]->getONNXModelSize(), path);
1101 LOG(info) << "Dumped TPC clusterizer NN " << i << " to file " << path;
1102 }
1103 }
1104 }
1105 if (needCalibUpdate) {
1106 LOG(info) << "Updating GPUReconstruction calibration objects";
1107 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1108 }
1109}
1110
1112{
1113 Options opts;
1114 if (mSpecConfig.enableDoublePipeline) {
1115 bool send = mSpecConfig.enableDoublePipeline == 2;
1116 char* o2jobid = getenv("O2JOBID");
1117 char* numaid = getenv("NUMAID");
1118 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1119 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1120 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1121 }
1122 if (mSpecConfig.enableDoublePipeline == 2) {
1123 return opts;
1124 }
1125 if (mSpecConfig.outputTracks) {
1127 }
1128 return opts;
1129}
1130
1132{
1133 Inputs inputs;
1134 if (mSpecConfig.zsDecoder) {
1135 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1136 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1137 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1138 if (mSpecConfig.askDISTSTF) {
1139 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1140 }
1141 }
1142 if (mSpecConfig.enableDoublePipeline == 2) {
1143 if (!mSpecConfig.zsDecoder) {
1144 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1145 }
1146 return inputs;
1147 } else if (mSpecConfig.enableDoublePipeline == 1) {
1148 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1149 }
1150 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1151 // calibration objects for TPC clusterization
1152 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1153 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1154 }
1155 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1156 // calibration objects for TPC tracking
1157 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1158 if (mapSources != 0) {
1159 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1161 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1162 }
1164 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1165 }
1166 }
1167
1168 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1169 if (mSpecConfig.tpcUseMCTimeGain) {
1170 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1171 } else {
1172 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1173 }
1174 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1175 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1177 Options optsDummy;
1178 o2::tpc::CorrectionMapsLoaderGloOpts gloOpts{mSpecConfig.lumiScaleType, mSpecConfig.lumiScaleMode, mSpecConfig.enableMShape, mSpecConfig.enableCTPLumi};
1179 mCalibObjects.mFastTransformHelper->requestCCDBInputs(inputs, optsDummy, gloOpts); // option filled here is lost
1180 }
1181 if (mSpecConfig.decompressTPC) {
1182 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1183 } else if (mSpecConfig.caClusterer) {
1184 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1185 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1186 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1187 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1188 }
1189 } else if (mSpecConfig.runTPCTracking) {
1190 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1191 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1192 }
1193 if (mSpecConfig.processMC) {
1194 if (mSpecConfig.caClusterer) {
1195 if (!mSpecConfig.zsDecoder) {
1196 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1197 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1198 }
1199 } else if (mSpecConfig.runTPCTracking) {
1200 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1201 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1202 }
1203 }
1204
1205 if (mSpecConfig.zsOnTheFly) {
1206 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1207 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1208 }
1209 if (mSpecConfig.readTRDtracklets) {
1210 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1211 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1212 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1213 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1214 }
1215
1216 if (mSpecConfig.runITSTracking) {
1217 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", 0, Lifetime::Timeframe);
1218 inputs.emplace_back("patterns", "ITS", "PATTERNS", 0, Lifetime::Timeframe);
1219 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", 0, Lifetime::Timeframe);
1220 if (mSpecConfig.itsTriggerType == 1) {
1221 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1222 } else if (mSpecConfig.itsTriggerType == 2) {
1223 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1224 }
1225 if (mSpecConfig.enableDoublePipeline != 2) {
1226 if (mSpecConfig.isITS3) {
1227 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1228 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1229 } else {
1230 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1231 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1232 }
1233 if (mSpecConfig.itsOverrBeamEst) {
1234 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1235 }
1236 }
1237 if (mSpecConfig.processMC) {
1238 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", 0, Lifetime::Timeframe);
1239 inputs.emplace_back("ITSMC2ROframes", "ITS", "CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1240 }
1241 }
1242
1243 // NN clusterizer
1244 *mConfParam = mConfig->ReadConfigurableParam();
1245 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1246
1247 LOG(info) << "(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1248 mSpecConfig.nnLoadFromCCDB = true;
1249 mSpecConfig.nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1250 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1251
1252 std::map<std::string, std::string> metadata;
1253 metadata["inputDType"] = nnClusterizerSettings.nnInferenceInputDType; // FP16 or FP32
1254 metadata["outputDType"] = nnClusterizerSettings.nnInferenceOutputDType; // FP16 or FP32
1255 metadata["nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum; // 0, 1 -> Only for regression model
1256 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType; // FC, CNN
1257 metadata["nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate; // in kHz
1258 metadata["nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType; // pp, pPb, PbPb
1259
1260 auto convert_map_to_metadata = [](const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1261 for (const auto& [key, value] : inputMap) {
1262 if (value != "") {
1263 outputMetadata.push_back({key, value});
1264 }
1265 }
1266 };
1267
1268 mSpecConfig.nnEvalMode = o2::utils::Str::tokenize(nnClusterizerSettings.nnEvalMode, ':');
1269 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1270
1271 if (mConfParam->printSettings) {
1272 auto printSettings = [](const std::map<std::string, std::string>& settings) {
1273 LOG(info) << "(NN CLUS) NN Clusterizer CCDB settings:";
1274 for (const auto& [key, value] : settings) {
1275 LOG(info) << " " << key << " : " << value;
1276 }
1277 };
1278 printSettings(metadata);
1279 }
1280
1281 if (mSpecConfig.nnEvalMode[0] == "c1") {
1282 metadata["nnCCDBEvalType"] = "classification_c1";
1283 convert_map_to_metadata(metadata, ccdb_metadata);
1284 inputs.emplace_back("nn_classification_c1", gDataOriginTPC, "NNCLUSTERIZER_C1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1285 } else if (mSpecConfig.nnEvalMode[0] == "c2") {
1286 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1287 metadata["nnCCDBEvalType"] = "classification_c2";
1288 convert_map_to_metadata(metadata, ccdb_metadata);
1289 inputs.emplace_back("nn_classification_c2", gDataOriginTPC, "NNCLUSTERIZER_C2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1290 }
1291
1292 metadata["nnCCDBEvalType"] = "regression_c1";
1293 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1294 convert_map_to_metadata(metadata, ccdb_metadata);
1295 inputs.emplace_back("nn_regression_c1", gDataOriginTPC, "NNCLUSTERIZER_R1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1296
1297 if (mSpecConfig.nnEvalMode[1] == "r2") {
1298 metadata["nnCCDBEvalType"] = "regression_c2";
1299 convert_map_to_metadata(metadata, ccdb_metadata);
1300 inputs.emplace_back("nn_regression_c2", gDataOriginTPC, "NNCLUSTERIZER_R2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1301 }
1302 }
1303
1304 return inputs;
1305};
1306
1308{
1309 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1310 std::vector<OutputSpec> outputSpecs;
1311 if (mSpecConfig.enableDoublePipeline == 2) {
1312 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1313 return outputSpecs;
1314 }
1315 if (mSpecConfig.outputTracks) {
1316 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1317 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1318 }
1319 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1320 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1321 }
1322 if (mSpecConfig.outputCompClustersRoot) {
1323 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1324 }
1325 if (mSpecConfig.outputCompClustersFlat) {
1326 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1327 }
1328 if (mSpecConfig.outputCAClusters) {
1329 for (auto const& sector : mTPCSectors) {
1330 mClusterOutputIds.emplace_back(sector);
1331 }
1332 if (mSpecConfig.sendClustersPerSector) {
1333 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1334 for (const auto sector : mTPCSectors) {
1335 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1336 }
1337 } else {
1338 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1339 }
1340 if (mSpecConfig.processMC) {
1341 if (mSpecConfig.sendClustersPerSector) {
1342 for (const auto sector : mTPCSectors) {
1343 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1344 }
1345 } else {
1346 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1347 }
1348 }
1349 }
1350 if (mSpecConfig.outputSharedClusterMap) {
1351 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1352 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1353 }
1354 if (mSpecConfig.tpcTriggerHandling) {
1355 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1356 }
1357 if (mSpecConfig.outputQA) {
1358 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1359 }
1360 if (mSpecConfig.outputErrorQA) {
1361 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1362 }
1363
1364 if (mSpecConfig.runITSTracking) {
1365 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1367 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1368 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1369 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1371
1372 if (mSpecConfig.processMC) {
1373 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1377 }
1378 }
1379
1380 return outputSpecs;
1381};
1382
1384{
1385 ExitPipeline();
1386 mQA.reset(nullptr);
1387 mDisplayFrontend.reset(nullptr);
1388 mGPUReco.reset(nullptr);
1389}
1390
1391} // namespace o2::gpu
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Definition of TPCFastTransform class.
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
StringRef key
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:175
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:593
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:621
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"