Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
42#include "TPCFastTransform.h"
51#include "TPCBase/RDHUtils.h"
53#include "GPUO2InterfaceQA.h"
54#include "GPUO2Interface.h"
55#include "GPUO2InterfaceUtils.h"
56#include "CalibdEdxContainer.h"
57#include "ORTRootSerializer.h"
58#include "GPUNewCalibValues.h"
59#include "TPCPadGainCalib.h"
60#include "TPCZSLinkMapping.h"
62#include "TPCBase/Sector.h"
63#include "TPCBase/Utils.h"
71#include "Algorithm/Parser.h"
74#include "TRDBase/Geometry.h"
76#include "GPUTRDRecoParam.h"
82#include "GPUWorkflowInternal.h"
83#include "GPUDataTypesQA.h"
84// #include "Framework/ThreadPool.h"
85
86#include <TStopwatch.h>
87#include <TObjArray.h>
88#include <TH1F.h>
89#include <TH2F.h>
90#include <TH1D.h>
91#include <TGraphAsymmErrors.h>
92
93#include <filesystem>
94#include <memory>
95#include <vector>
96#include <iomanip>
97#include <stdexcept>
98#include <regex>
99#include <sys/types.h>
100#include <sys/stat.h>
101#include <fcntl.h>
102#include <chrono>
103#include <unordered_set>
104
105using namespace o2::framework;
106using namespace o2::header;
107using namespace o2::gpu;
108using namespace o2::base;
109using namespace o2::dataformats;
111
112namespace o2::gpu
113{
114
115GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
116{
117 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
118 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
119 }
120
121 mConfig.reset(new GPUO2InterfaceConfiguration);
122 mConfParam.reset(new GPUSettingsO2);
123 mTFSettings.reset(new GPUSettingsTF);
124 mTimer.reset(new TStopwatch);
125 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
126
127 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
128 *gPolicyOrder = &mPolicyOrder;
129 }
130}
131
133
135{
137 GPUO2InterfaceConfiguration& config = *mConfig.get();
138
139 // Create configuration object and fill settings
140 mConfig->configGRP.solenoidBzNominalGPU = 0;
141 mTFSettings->hasSimStartOrbit = 1;
142 auto& hbfu = o2::raw::HBFUtils::Instance();
143 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
144
145 *mConfParam = mConfig->ReadConfigurableParam();
146
147 if (mConfParam->display) {
148 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
149 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
150 if (mConfig->configProcessing.eventDisplay != nullptr) {
151 LOG(info) << "Event display enabled";
152 } else {
153 throw std::runtime_error("GPU Event Display frontend could not be created!");
154 }
155 }
156 if (mSpecConfig.enableDoublePipeline) {
157 mConfig->configProcessing.doublePipeline = 1;
158 }
159
160 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
161 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
162 if (mAutoContinuousMaxTimeBin) {
163 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
164 }
165 if (mConfig->configProcessing.deviceNum == -2) {
166 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
167 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
168 mConfig->configProcessing.deviceNum = myId;
169 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
170 }
171 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
172 mVerbosity = 1;
173 }
174 mConfig->configProcessing.runMC = mSpecConfig.processMC;
175 if (mSpecConfig.outputQA) {
176 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
177 throw std::runtime_error("Need MC information to create QA plots");
178 }
179 if (!mSpecConfig.processMC) {
180 mConfig->configQA.noMC = true;
181 }
182 mConfig->configQA.shipToQC = true;
183 if (!mConfig->configProcessing.runQA) {
184 mConfig->configQA.enableLocalOutput = false;
185 mQATaskMask = (mSpecConfig.processMC ? gpudatatypes::gpuqa::tasksAllMC : gpudatatypes::gpuqa::tasksNone) | (mConfig->configQA.clusterRejectionHistograms ? gpudatatypes::gpuqa::taskClusterCounts : gpudatatypes::gpuqa::tasksNone);
186 mConfig->configProcessing.runQA = -mQATaskMask;
187 }
188 }
189 mConfig->configInterface.outputToExternalBuffers = true;
190 const bool runTracking = mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat;
191
192 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
193 if (runTracking) {
194 mConfig->configWorkflow.steps.set(gpudatatypes::RecoStep::TPCConversion,
197 mConfig->configWorkflow.outputs.set(gpudatatypes::InOutType::TPCMergedTracks);
198 }
199 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
200
201 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
202 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, true);
203 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, true);
204 }
205 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCClusters);
206 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
207 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCRaw);
208 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCClusterFinding, true);
209 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
210 }
211 if (mSpecConfig.decompressTPC) {
212 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, false);
213 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCDecompression, true);
214 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCCompressedClusters);
215 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
216 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, false);
217 if (mTPCSectorMask != 0xFFFFFFFFF) {
218 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
219 }
220 }
221 if (mSpecConfig.runTRDTracking) {
222 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::TRDTracklets, true);
223 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TRDTracking, true);
224 }
225 if (mSpecConfig.runITSTracking) {
226 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::ITSClusters, true);
227 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::ITSTracks, true);
228 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::ITSTracking, true);
229 }
230 if (mSpecConfig.outputSharedClusterMap) {
231 mConfig->configProcessing.outputSharedClusterMap = true;
232 }
233 if (!mSpecConfig.outputTracks) {
234 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
235 }
236 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
237
238 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
239 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
240 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
241 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
242 }
243 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
244 throw std::runtime_error("double pipeline requires exactly 2 threads");
245 } */
246 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
247 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
248 }
249
250 if (mSpecConfig.enableDoublePipeline != 2) {
251 mGPUReco = std::make_unique<GPUO2Interface>();
252
253 // initialize TPC calib objects
254 initFunctionTPCCalib(ic);
255
256 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
257 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
258 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
259 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
260 if (mConfig->configCalib.fastTransform == nullptr) {
261 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
262 }
263
264 if (mConfParam->matLUTFile.size()) {
265 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
266 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
267 if (mConfig->configCalib.matLUT == nullptr) {
268 LOGF(fatal, "Error loading matlut file");
269 }
270 } else {
271 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
272 }
273
274 if (mSpecConfig.readTRDtracklets) {
275 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
276 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
277
278 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
279 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
280 }
281
282 mConfig->configProcessing.willProvideO2PropagatorLate = true;
283 mConfig->configProcessing.o2PropagatorUseGPUField = true;
284
285 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
286 mConfig->configProcessing.printSettings = true;
287 if (mConfParam->printSettings > 1) {
288 mConfig->PrintParam();
289 }
290 }
291
292 // Configuration is prepared, initialize the tracker.
293 if (mGPUReco->Initialize(config) != 0) {
294 throw std::invalid_argument("GPU Reconstruction initialization failed");
295 }
296 if (mSpecConfig.outputQA) {
297 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
298 }
299 if (mSpecConfig.outputErrorQA) {
300 mGPUReco->setErrorCodeOutput(&mErrorQA);
301 }
302
303 // initialize ITS
304 if (mSpecConfig.runITSTracking) {
305 initFunctionITS(ic);
306 }
307 }
308
309 if (mSpecConfig.enableDoublePipeline) {
310 initPipeline(ic);
311 if (mConfParam->dump >= 2) {
312 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
313 }
314 }
315
316 auto& callbacks = ic.services().get<CallbackService>();
317 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
318 if (info.size == 0) {
319 return;
320 }
321 if (mSpecConfig.enableDoublePipeline) {
322 mRegionInfos.emplace_back(info);
323 }
324 if (mSpecConfig.enableDoublePipeline == 2) {
325 return;
326 }
327 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
328 return;
329 }
330 int32_t fd = 0;
331 if (mConfParam->mutexMemReg) {
332 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
333 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
334 if (fd == -1) {
335 throw std::runtime_error("Error opening memlock mutex lock file");
336 }
337 fchmod(fd, mask);
338 if (lockf(fd, F_LOCK, 0)) {
339 throw std::runtime_error("Error locking memlock mutex file");
340 }
341 }
342 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
343 if (mConfParam->benchmarkMemoryRegistration) {
344 start = std::chrono::high_resolution_clock::now();
345 }
346 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
347 throw std::runtime_error("Error registering memory for GPU");
348 }
349 if (mConfParam->benchmarkMemoryRegistration) {
350 end = std::chrono::high_resolution_clock::now();
351 std::chrono::duration<double> elapsed_seconds = end - start;
352 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
353 }
354 if (mConfParam->mutexMemReg) {
355 if (lockf(fd, F_ULOCK, 0)) {
356 throw std::runtime_error("Error unlocking memlock mutex file");
357 }
358 close(fd);
359 }
360 });
361
362 mTimer->Stop();
363 mTimer->Reset();
364}
365
367{
368 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
369 handlePipelineStop();
370}
371
373{
374 handlePipelineEndOfStream(ec);
375}
376
378{
379 if (mSpecConfig.enableDoublePipeline != 2) {
380 finaliseCCDBTPC(matcher, obj);
381 if (mSpecConfig.runITSTracking) {
382 finaliseCCDBITS(matcher, obj);
383 }
384 }
385 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
386 mGRPGeomUpdated = true;
387 return;
388 }
389}
390
391template <class D, class E, class F, class G, class H, class I, class J, class K>
392void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
393{
394 if (mSpecConfig.enableDoublePipeline == 1) {
395 return;
396 }
397 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
398 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
399
400 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
401 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
402 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
403 tpcZSmeta.Pointers[i][j].clear();
404 tpcZSmeta.Sizes[i][j].clear();
405 }
406 }
407 }
408 if (mSpecConfig.zsOnTheFly) {
409 tpcZSonTheFlySizes = {0};
410 // tpcZSonTheFlySizes: #zs pages per endpoint:
411 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
412 bool recv = false, recvsizes = false;
413 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
414 if (recvsizes) {
415 throw std::runtime_error("Received multiple ZSSIZES data");
416 }
417 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
418 recvsizes = true;
419 }
420 // zs pages
421 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
422 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
423 if (recv) {
424 throw std::runtime_error("Received multiple TPCZS data");
425 }
426 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
427 recv = true;
428 }
429 if (!recv || !recvsizes) {
430 throw std::runtime_error("TPC ZS on the fly data not received");
431 }
432
433 uint32_t offset = 0;
434 for (uint32_t i = 0; i < NSectors; i++) {
435 uint32_t pageSector = 0;
436 for (uint32_t j = 0; j < NEndpoints; j++) {
437 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
438 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
439 }
440 if (mVerbosity >= 1) {
441 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
442 }
443 }
444 }
445 if (mSpecConfig.zsDecoder) {
446 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
447 auto isSameRdh = [](const char* left, const char* right) -> bool {
448 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
449 };
450 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
451 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
452 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
453 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
454 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
455 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
456 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
457 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
458 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
459 };
460 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
461 if (checkForZSData(ptr, subSpec)) {
462 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
463 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
464 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
465 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
466 }
467 };
468 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
469 debugTFDump = true;
470 static uint32_t nErrors = 0;
471 nErrors++;
472 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
473 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
474 }
475 }
476
477 int32_t totalCount = 0;
478 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
479 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
480 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
481 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
482 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
483 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
484 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
485 totalCount += tpcZSmeta.Pointers[i][j].size();
486 }
487 }
488 } else if (mSpecConfig.decompressTPC) {
489 if (mSpecConfig.decompressTPCFromROOT) {
490 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
491 compClustersFlatDummy.setForward(&compClustersDummy);
492 pCompClustersFlat = &compClustersFlatDummy;
493 } else {
494 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
495 }
496 if (pCompClustersFlat == nullptr) {
497 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
498 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
499 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
500 }
501 } else if (!mSpecConfig.zsOnTheFly) {
502 if (mVerbosity) {
503 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
504 }
505 }
506}
507
508int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
509{
510 int32_t retVal = 0;
511 if (mConfParam->dump < 2) {
512 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
513
514 if (retVal == 0 && mSpecConfig.runITSTracking) {
515 retVal = runITSTracking(*pc);
516 }
517 }
518
519 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
520 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
521 }
522 return retVal;
523}
524
525void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
526{
527 if (mOldCalibObjects.size() > 0) {
528 mOldCalibObjects.pop();
529 }
530 mOldCalibObjects.emplace(std::move(oldCalibObjects));
531}
532
534{
535 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
536 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
537
538 auto cput = mTimer->CpuTime();
539 auto realt = mTimer->RealTime();
540 mTimer->Start(false);
541 mNTFs++;
542
543 std::vector<gsl::span<const char>> inputs;
544
545 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
546 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
547 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
548 o2::tpc::CompressedClusters compClustersDummy;
551 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
552 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
553 std::unique_ptr<char[]> tmpEmptyCompClusters;
554
555 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
556 bool debugTFDump = false;
557
558 if (mSpecConfig.processMC) {
559 getWorkflowTPCInput_mc = true;
560 }
561 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
562 getWorkflowTPCInput_clusters = true;
563 }
564 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
565 getWorkflowTPCInput_digits = true;
566 }
567
568 // ------------------------------ Handle inputs ------------------------------
569
570 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
571
573 if (mSpecConfig.enableDoublePipeline != 2) {
574 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
575 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
576 }
577 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
578 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
579 }
580 }
581
583 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
584 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
585
586 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
587 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
588 mTFSettings->hasTfStartOrbit = 1;
589 mTFSettings->hasNHBFPerTF = 1;
590 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
591 mTFSettings->hasRunStartOrbit = 0;
592 ptrs.settingsTF = mTFSettings.get();
593
594 if (mSpecConfig.enableDoublePipeline != 2) {
595 if (mVerbosity) {
596 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
597 }
598 if (mConfParam->checkFirstTfOrbit) {
599 static uint32_t lastFirstTFOrbit = -1;
600 static uint32_t lastTFCounter = -1;
601 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
602 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
603 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
604 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
605 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
606 }
607 }
608 lastFirstTFOrbit = tinfo.firstTForbit;
609 lastTFCounter = tinfo.tfCounter;
610 }
611 }
612
614 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
615 if (mSpecConfig.readTRDtracklets) {
616 o2::globaltracking::DataRequest dataRequestTRD;
618 inputTracksTRD.collectData(pc, dataRequestTRD);
619 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
620 }
621
622 void* ptrEp[NSectors * NEndpoints] = {};
623 bool doInputDigits = false, doInputDigitsMC = false;
624 if (mSpecConfig.decompressTPC) {
625 ptrs.tpcCompressedClusters = pCompClustersFlat;
626 } else if (mSpecConfig.zsOnTheFly) {
627 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
628 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
629 ptrs.tpcZS = &tpcZS;
630 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
631 } else if (mSpecConfig.zsDecoder) {
632 ptrs.tpcZS = &tpcZS;
633 if (mSpecConfig.processMC) {
634 throw std::runtime_error("Cannot process MC information, none available");
635 }
636 } else if (mSpecConfig.caClusterer) {
637 doInputDigits = true;
638 doInputDigitsMC = mSpecConfig.processMC;
639 } else {
640 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
641 }
642
643 if (mTPCSectorMask != 0xFFFFFFFFF) {
644 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
645 for (uint32_t i = 0; i < NSectors; i++) {
646 if (!(mTPCSectorMask & (1ul << i))) {
647 if (ptrs.tpcZS) {
648 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
649 tpcZS.sector[i].zsPtr[j] = nullptr;
650 tpcZS.sector[i].nZSPtr[j] = nullptr;
651 tpcZS.sector[i].count[j] = 0;
652 }
653 }
654 }
655 }
656 }
657
658 GPUTrackingInOutDigits tpcDigitsMap;
659 GPUTPCDigitsMCInput tpcDigitsMapMC;
660 if (doInputDigits) {
661 ptrs.tpcPackedDigits = &tpcDigitsMap;
662 if (doInputDigitsMC) {
663 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
664 }
665 for (uint32_t i = 0; i < NSectors; i++) {
666 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
667 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
668 if (doInputDigitsMC) {
669 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
670 }
671 }
672 }
673
674 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
675 if (mClusterOutputIds.size() > 0) {
676 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
677 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
678 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
679 }
680
681 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
682
683 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
684 if (mSpecConfig.enableDoublePipeline) {
685 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
686 return;
687 }
688 }
689
690 // ------------------------------ Prepare outputs ------------------------------
691
692 GPUInterfaceOutputs outputRegions;
693 using outputDataType = char;
694 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
695 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
696 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
697 std::unordered_set<std::string> outputsCreated;
698
699 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
700 if (condition) {
701 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
702 if (mConfParam->allocateOutputOnTheFly) {
703 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
704 size += offset;
705 if (mVerbosity) {
706 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
707 }
708 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
709 if (mVerbosity) {
710 start = std::chrono::high_resolution_clock::now();
711 }
712 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
713 outputsCreated.insert(name);
714 if (mVerbosity) {
715 end = std::chrono::high_resolution_clock::now();
716 std::chrono::duration<double> elapsed_seconds = end - start;
717 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
718 << ": " << elapsed_seconds.count() << "s";
719 }
720 return (buffer.second = buffer.first->get().data()) + offset;
721 };
722 } else {
723 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
724 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
725 region.size = buffer.first->get().size() - offset;
726 outputsCreated.insert(name);
727 }
728 }
729 };
730
731 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
732 if (!buffer.first) {
733 return;
734 }
735 if (buffer.first->get().size() < size) {
736 throw std::runtime_error("Invalid buffer size requested");
737 }
738 buffer.first->get().resize(size);
739 if (size && buffer.first->get().data() != buffer.second) {
740 throw std::runtime_error("Inconsistent buffer address after downsize");
741 }
742 };
743
744 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
745 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
746 downSizeBuffer(buffer, size);
747 };*/
748
749 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
750 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
751 if (!buffer.first) {
752 return;
753 }
754 if (span.size() && buffer.second != (char*)span.data()) {
755 throw std::runtime_error("Buffer does not match span");
756 }
757 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
758 };
759
760 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
761 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
762 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
763 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
764 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
765 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
766 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
767 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
769 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
770 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
771 }
772
773 // ------------------------------ Actual processing ------------------------------
774
775 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
776 throw std::runtime_error("Invalid input for gpu tracking");
777 }
778
779 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
780
781 calibObjectStruct oldCalibObjects;
782 doCalibUpdates(pc, oldCalibObjects);
783
784 lockDecodeInput.reset();
785
786 uint32_t threadIndex;
787 if (mConfParam->dump) {
788 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
789 while (pipelineContext->jobThreadIndex == -1) {
790 }
791 threadIndex = pipelineContext->jobThreadIndex;
792 } else {
793 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
794 }
795
796 std::string dir = "";
797 if (mConfParam->dumpFolder != "") {
798 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
799 if (mNTFs == 1) {
800 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
801 }
802 dir += "/";
803 }
804 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
805 mGPUReco->DumpSettings(threadIndex, dir.c_str());
806 }
807 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
808 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
809 mNTFDumps++;
810 }
811 }
812 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
813 if (mConfParam->dumpBadTFMode == 2) {
814 ptrsDump.reset(new GPUTrackingInOutPointers);
815 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
816 }
817
818 int32_t retVal = 0;
819 if (mSpecConfig.enableDoublePipeline) {
820 if (!pipelineContext->jobSubmitted) {
821 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
822 } else {
823 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
824 }
825 std::unique_lock lk(pipelineContext->jobFinishedMutex);
826 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
827 retVal = pipelineContext->jobReturnValue;
828 threadIndex = pipelineContext->jobThreadIndex;
829 } else {
830 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
831 threadIndex = mNextThreadIndex;
832 if (mConfig->configProcessing.doublePipeline) {
833 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
834 }
835
836 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
837 }
838 if (retVal != 0) {
839 debugTFDump = true;
840 }
841 cleanOldCalibsTPCPtrs(oldCalibObjects);
842
843 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
844
845 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
846 mNDebugDumps++;
847 if (mConfParam->dumpBadTFMode <= 1) {
848 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
849 FILE* fp = fopen(filename.c_str(), "w+b");
850 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
851 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
852 auto data = pc.inputs().get<gsl::span<char>>(ref);
853 if (mConfParam->dumpBadTFMode == 1) {
854 uint64_t size = data.size();
855 fwrite(&size, 1, sizeof(size), fp);
856 }
857 fwrite(data.data(), 1, data.size(), fp);
858 }
859 fclose(fp);
860 } else if (mConfParam->dumpBadTFMode == 2) {
861 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
862 }
863 }
864
865 if (mConfParam->dump == 2) {
866 return;
867 }
868
869 // ------------------------------ Varios postprocessing steps ------------------------------
870
871 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
873 }
874 bool createEmptyOutput = false;
875 if (retVal != 0) {
876 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
877 if (mConfig->configProcessing.throttleAlarms) {
878 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
879 } else {
880 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
881 }
882 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
883 } else {
884 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
885 }
886 }
887
888 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
889 if (createEmptyOutput) {
890 memset(&ptrs, 0, sizeof(ptrs));
891 for (uint32_t i = 0; i < outputRegions.count(); i++) {
892 if (outputBuffers[i].first) {
893 size_t toSize = 0;
894 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
895 toSize = sizeof(*ptrs.tpcCompressedClusters);
896 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
897 toSize = sizeof(o2::tpc::ClusterCountIndex);
898 }
899 outputBuffers[i].first->get().resize(toSize);
900 outputBuffers[i].second = outputBuffers[i].first->get().data();
901 if (toSize) {
902 memset(outputBuffers[i].second, 0, toSize);
903 }
904 }
905 }
906 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
907 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
908 ptrs.clustersNative = tmpEmptyClNative.get();
909 if (mSpecConfig.processMC) {
910 MCLabelContainer cont;
911 cont.flatten_to(clustersMCBuffer.first);
912 clustersMCBuffer.second = clustersMCBuffer.first;
913 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
914 }
915 } else {
916 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
917 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
918 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
919 if (!mConfParam->allocateOutputOnTheFly) {
920 for (uint32_t i = 0; i < outputRegions.count(); i++) {
921 if (outputRegions.asArray()[i].ptrBase) {
922 if (outputRegions.asArray()[i].size == 1) {
923 throw std::runtime_error("Preallocated buffer size exceeded");
924 }
925 outputRegions.asArray()[i].checkCurrent();
926 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
927 }
928 }
929 }
930 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
931 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
932 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
933
934 // if requested, tune TPC tracks
935 if (ptrs.nOutputTracksTPCO2) {
936 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
937 }
938
939 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
940 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
941 }
942 }
943
944 if (mConfig->configWorkflow.outputs.isSet(gpudatatypes::InOutType::TPCMergedTracks)) {
945 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
946 }
947
948 if (mSpecConfig.outputCompClustersRoot) {
951 }
952
953 if (mClusterOutputIds.size() > 0) {
954 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
955 if (mSpecConfig.sendClustersPerSector) {
956 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
957 for (uint32_t i = 0; i < NSectors; i++) {
958 if (mTPCSectorMask & (1ul << i)) {
960 clusterOutputSectorHeader.sectorBits = (1ul << i);
961 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
962 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
963 memset(outIndex, 0, sizeof(*outIndex));
964 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
965 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
966 }
967 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
968 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
969 MCLabelContainer cont;
970 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
971 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
972 for (const auto& label : labels) {
973 cont.addElement(j, label);
974 }
975 }
976 ConstMCLabelContainer contflat;
977 cont.flatten_to(contflat);
978 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
979 }
980 }
981 }
982 } else {
983 // Clusters are shipped as single message, fill ClusterCountIndex
984 DataHeader::SubSpecificationType subspec = NSectors;
985 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
986 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
987 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
988 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
989 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
990 }
991 }
992 }
993 if (mSpecConfig.outputQA) {
994 TObjArray out;
995 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
996 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
997 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
998 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
999 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
1000 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1001 if (sendQAOutput) {
1002 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1003 }
1004 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1005 if (sendQAOutput) {
1006 mQA->cleanup();
1007 }
1008 }
1009 if (mSpecConfig.outputErrorQA) {
1010 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1011 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1012 }
1013 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1015 }
1016 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1018 }
1019 mTimer->Stop();
1020 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1021}
1022
1023void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1024{
1025 GPUCalibObjectsConst newCalibObjects;
1026 GPUNewCalibValues newCalibValues;
1027 // check for updates of TPC calibration objects
1028 bool needCalibUpdate = false;
1029 if (mGRPGeomUpdated) {
1030 mGRPGeomUpdated = false;
1031 needCalibUpdate = true;
1032
1033 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1036 mITSGeometryCreated = true;
1037 }
1038
1039 if (mAutoSolenoidBz) {
1040 newCalibValues.newSolenoidField = true;
1041 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1042 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1043 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1044 }
1045 if (mAutoContinuousMaxTimeBin) {
1046 newCalibValues.newContinuousMaxTimeBin = true;
1047 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1048 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1049 }
1050
1051 if (!mPropagatorInstanceCreated) {
1052 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1053 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1054 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1055 }
1056 mPropagatorInstanceCreated = true;
1057 }
1058
1059 if (!mMatLUTCreated) {
1060 if (mConfParam->matLUTFile.size() == 0) {
1061 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1062 LOG(info) << "Loaded material budget lookup table";
1063 }
1064 mMatLUTCreated = true;
1065 }
1066 if (mSpecConfig.readTRDtracklets) {
1067 if (!mTRDGeometryCreated) {
1068 auto gm = o2::trd::Geometry::instance();
1069 gm->createPadPlaneArray();
1070 gm->createClusterMatrixArray();
1071 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1072 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1073 LOG(info) << "Loaded TRD geometry";
1074 mTRDGeometryCreated = true;
1075 }
1076 if (!mTRDRecoParamCreated) {
1077 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1078 newCalibObjects.trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1079 mTRDRecoParamCreated = true;
1080 }
1081 }
1082 }
1083 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1084 if (mSpecConfig.runITSTracking) {
1085 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1086 }
1087 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1088 newCalibValues.newTPCTimeBinCut = true;
1089 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1090 needCalibUpdate = true;
1091 }
1092 if (mSpecConfig.nnLoadFromCCDB) {
1093 auto dumpToFile = [](const char* buffer, std::size_t validSize, const std::string& path) {
1094 std::ofstream out(path, std::ios::binary | std::ios::trunc);
1095 if (!out.is_open()) {
1096 throw std::runtime_error("Failed to open output file: " + path);
1097 }
1098
1099 out.write(buffer, static_cast<std::streamsize>(validSize));
1100 if (!out) {
1101 throw std::runtime_error("Failed while writing data to: " + path);
1102 }
1103 };
1104 for (int i = 0; i < 3; i++) {
1105 newCalibObjects.nnClusterizerNetworks[i] = mConfig->configCalib.nnClusterizerNetworks[i];
1106 if (mSpecConfig.nnDumpToFile && newCalibObjects.nnClusterizerNetworks[i]) {
1107 std::string path = "tpc_nn_clusterizer_" + std::to_string(i) + ".onnx";
1108 dumpToFile(newCalibObjects.nnClusterizerNetworks[i]->getONNXModel(), newCalibObjects.nnClusterizerNetworks[i]->getONNXModelSize(), path);
1109 LOG(info) << "Dumped TPC clusterizer NN " << i << " to file " << path;
1110 }
1111 }
1112 }
1113 if (needCalibUpdate) {
1114 LOG(info) << "Updating GPUReconstruction calibration objects";
1115 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1116 }
1117}
1118
1120{
1121 Options opts;
1122 if (mSpecConfig.enableDoublePipeline) {
1123 bool send = mSpecConfig.enableDoublePipeline == 2;
1124 char* o2jobid = getenv("O2JOBID");
1125 char* numaid = getenv("NUMAID");
1126 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1127 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1128 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1129 }
1130 if (mSpecConfig.enableDoublePipeline == 2) {
1131 return opts;
1132 }
1133 if (mSpecConfig.outputTracks) {
1135 }
1136 return opts;
1137}
1138
1140{
1141 Inputs inputs;
1142 if (mSpecConfig.zsDecoder) {
1143 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1144 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1145 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1146 if (mSpecConfig.askDISTSTF) {
1147 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1148 }
1149 }
1150 if (mSpecConfig.enableDoublePipeline == 2) {
1151 if (!mSpecConfig.zsDecoder) {
1152 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1153 }
1154 return inputs;
1155 } else if (mSpecConfig.enableDoublePipeline == 1) {
1156 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1157 }
1158 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1159 // calibration objects for TPC clusterization
1160 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1161 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1162 }
1163 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1164 // calibration objects for TPC tracking
1165 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1166 if (mapSources != 0) {
1167 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1169 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1170 }
1172 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1173 }
1174 }
1175
1176 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1177 if (mSpecConfig.tpcUseMCTimeGain) {
1178 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1179 } else {
1180 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1181 }
1182 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1183 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1185 Options optsDummy;
1186 o2::tpc::CorrectionMapsLoaderGloOpts gloOpts{mSpecConfig.lumiScaleType, mSpecConfig.lumiScaleMode, mSpecConfig.enableMShape, mSpecConfig.enableCTPLumi};
1187 mCalibObjects.mFastTransformHelper->requestCCDBInputs(inputs, optsDummy, gloOpts); // option filled here is lost
1188 }
1189 if (mSpecConfig.decompressTPC) {
1190 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1191 } else if (mSpecConfig.caClusterer) {
1192 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1193 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1194 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1195 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1196 }
1197 } else if (mSpecConfig.runTPCTracking) {
1198 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1199 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1200 }
1201 if (mSpecConfig.processMC) {
1202 if (mSpecConfig.caClusterer) {
1203 if (!mSpecConfig.zsDecoder) {
1204 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1205 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1206 }
1207 } else if (mSpecConfig.runTPCTracking) {
1208 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1209 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1210 }
1211 }
1212
1213 if (mSpecConfig.zsOnTheFly) {
1214 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1215 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1216 }
1217 if (mSpecConfig.readTRDtracklets) {
1218 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1219 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1220 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1221 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1222 }
1223
1224 if (mSpecConfig.runITSTracking) {
1225 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", 0, Lifetime::Timeframe);
1226 inputs.emplace_back("patterns", "ITS", "PATTERNS", 0, Lifetime::Timeframe);
1227 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", 0, Lifetime::Timeframe);
1228 if (mSpecConfig.itsTriggerType == 1) {
1229 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1230 } else if (mSpecConfig.itsTriggerType == 2) {
1231 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1232 }
1233 if (mSpecConfig.enableDoublePipeline != 2) {
1234 if (mSpecConfig.isITS3) {
1235 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1236 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1237 } else {
1238 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1239 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1240 }
1241 if (mSpecConfig.itsOverrBeamEst) {
1242 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1243 }
1244 }
1245 if (mSpecConfig.processMC) {
1246 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", 0, Lifetime::Timeframe);
1247 inputs.emplace_back("ITSMC2ROframes", "ITS", "CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1248 }
1249 }
1250
1251 // NN clusterizer
1252 *mConfParam = mConfig->ReadConfigurableParam();
1253 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1254
1255 LOG(info) << "(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1256 mSpecConfig.nnLoadFromCCDB = true;
1257 mSpecConfig.nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1258 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1259
1260 std::map<std::string, std::string> metadata;
1261 metadata["inputDType"] = nnClusterizerSettings.nnInferenceInputDType; // FP16 or FP32
1262 metadata["outputDType"] = nnClusterizerSettings.nnInferenceOutputDType; // FP16 or FP32
1263 metadata["nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum; // 0, 1 -> Only for regression model
1264 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType; // FC, CNN
1265 metadata["nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate; // in kHz
1266 metadata["nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType; // pp, pPb, PbPb
1267
1268 auto convert_map_to_metadata = [](const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1269 for (const auto& [key, value] : inputMap) {
1270 if (value != "") {
1271 outputMetadata.push_back({key, value});
1272 }
1273 }
1274 };
1275
1276 mSpecConfig.nnEvalMode = o2::utils::Str::tokenize(nnClusterizerSettings.nnEvalMode, ':');
1277 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1278
1279 if (mConfParam->printSettings) {
1280 auto printSettings = [](const std::map<std::string, std::string>& settings) {
1281 LOG(info) << "(NN CLUS) NN Clusterizer CCDB settings:";
1282 for (const auto& [key, value] : settings) {
1283 LOG(info) << " " << key << " : " << value;
1284 }
1285 };
1286 printSettings(metadata);
1287 }
1288
1289 if (mSpecConfig.nnEvalMode[0] == "c1") {
1290 metadata["nnCCDBEvalType"] = "classification_c1";
1291 convert_map_to_metadata(metadata, ccdb_metadata);
1292 inputs.emplace_back("nn_classification_c1", gDataOriginTPC, "NNCLUSTERIZER_C1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1293 } else if (mSpecConfig.nnEvalMode[0] == "c2") {
1294 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1295 metadata["nnCCDBEvalType"] = "classification_c2";
1296 convert_map_to_metadata(metadata, ccdb_metadata);
1297 inputs.emplace_back("nn_classification_c2", gDataOriginTPC, "NNCLUSTERIZER_C2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1298 }
1299
1300 metadata["nnCCDBEvalType"] = "regression_c1";
1301 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 convert_map_to_metadata(metadata, ccdb_metadata);
1303 inputs.emplace_back("nn_regression_c1", gDataOriginTPC, "NNCLUSTERIZER_R1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1304
1305 if (mSpecConfig.nnEvalMode[1] == "r2") {
1306 metadata["nnCCDBEvalType"] = "regression_c2";
1307 convert_map_to_metadata(metadata, ccdb_metadata);
1308 inputs.emplace_back("nn_regression_c2", gDataOriginTPC, "NNCLUSTERIZER_R2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1309 }
1310 }
1311
1312 return inputs;
1313};
1314
1316{
1317 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1318 std::vector<OutputSpec> outputSpecs;
1319 if (mSpecConfig.enableDoublePipeline == 2) {
1320 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1321 return outputSpecs;
1322 }
1323 if (mSpecConfig.outputTracks) {
1324 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1325 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1326 }
1327 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1328 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1329 }
1330 if (mSpecConfig.outputCompClustersRoot) {
1331 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1332 }
1333 if (mSpecConfig.outputCompClustersFlat) {
1334 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1335 }
1336 if (mSpecConfig.outputCAClusters) {
1337 for (auto const& sector : mTPCSectors) {
1338 mClusterOutputIds.emplace_back(sector);
1339 }
1340 if (mSpecConfig.sendClustersPerSector) {
1341 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1342 for (const auto sector : mTPCSectors) {
1343 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1344 }
1345 } else {
1346 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1347 }
1348 if (mSpecConfig.processMC) {
1349 if (mSpecConfig.sendClustersPerSector) {
1350 for (const auto sector : mTPCSectors) {
1351 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1352 }
1353 } else {
1354 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1355 }
1356 }
1357 }
1358 if (mSpecConfig.outputSharedClusterMap) {
1359 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1360 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1361 }
1362 if (mSpecConfig.tpcTriggerHandling) {
1363 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1364 }
1365 if (mSpecConfig.outputQA) {
1366 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1367 }
1368 if (mSpecConfig.outputErrorQA) {
1369 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1370 }
1371
1372 if (mSpecConfig.runITSTracking) {
1373 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1377 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1378 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1379
1380 if (mSpecConfig.processMC) {
1381 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1384 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1385 }
1386 }
1387
1388 return outputSpecs;
1389};
1390
1392{
1393 ExitPipeline();
1394 mQA.reset(nullptr);
1395 mDisplayFrontend.reset(nullptr);
1396 mGPUReco.reset(nullptr);
1397}
1398
1399} // namespace o2::gpu
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Definition of TPCFastTransform class.
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
StringRef key
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:178
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
A read-only version of MCTruthContainer allowing for storage optimisation.
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:593
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:621
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"