Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
50#include "TPCBase/RDHUtils.h"
52#include "GPUO2InterfaceQA.h"
53#include "GPUO2Interface.h"
54#include "GPUO2InterfaceUtils.h"
55#include "CalibdEdxContainer.h"
56#include "ORTRootSerializer.h"
57#include "GPUNewCalibValues.h"
58#include "TPCPadGainCalib.h"
59#include "TPCZSLinkMapping.h"
61#include "TPCBase/Sector.h"
62#include "TPCBase/Utils.h"
70#include "TRDBase/Geometry.h"
72#include "GPUTRDRecoParam.h"
78#include "GPUWorkflowInternal.h"
79#include "GPUDataTypesQA.h"
80// #include "Framework/ThreadPool.h"
81
82#include <TStopwatch.h>
83#include <TObjArray.h>
84#include <TH1F.h>
85#include <TH2F.h>
86#include <TH1D.h>
87#include <TGraphAsymmErrors.h>
88
89#include <filesystem>
90#include <memory>
91#include <vector>
92#include <iomanip>
93#include <stdexcept>
94#include <regex>
95#include <sys/types.h>
96#include <sys/stat.h>
97#include <fcntl.h>
98#include <chrono>
99#include <unordered_set>
100
101using namespace o2::framework;
102using namespace o2::header;
103using namespace o2::gpu;
104using namespace o2::base;
105using namespace o2::dataformats;
107
108namespace o2::gpu
109{
110
111GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
112{
113 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
114 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
115 }
116
117 mConfig.reset(new GPUO2InterfaceConfiguration);
118 mConfParam.reset(new GPUSettingsO2);
119 mTFSettings.reset(new GPUSettingsTF);
120 mTimer.reset(new TStopwatch);
121 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
122
123 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
124 *gPolicyOrder = &mPolicyOrder;
125 }
126}
127
129
131{
133 GPUO2InterfaceConfiguration& config = *mConfig.get();
134
135 // Create configuration object and fill settings
136 mConfig->configGRP.solenoidBzNominalGPU = 0;
137 mTFSettings->hasSimStartOrbit = 1;
138 auto& hbfu = o2::raw::HBFUtils::Instance();
139 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
140
141 *mConfParam = mConfig->ReadConfigurableParam();
142
143 if (mConfParam->display) {
144 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
145 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
146 if (mConfig->configProcessing.eventDisplay != nullptr) {
147 LOG(info) << "Event display enabled";
148 } else {
149 throw std::runtime_error("GPU Event Display frontend could not be created!");
150 }
151 }
152 if (mSpecConfig.enableDoublePipeline) {
153 mConfig->configProcessing.doublePipeline = 1;
154 }
155
156 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
157 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
158 if (mAutoContinuousMaxTimeBin) {
159 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
160 }
161 if (mConfig->configProcessing.deviceNum == -2) {
162 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
163 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
164 mConfig->configProcessing.deviceNum = myId;
165 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
166 }
167 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
168 mVerbosity = 1;
169 }
170 mConfig->configProcessing.runMC = mSpecConfig.processMC;
171 if (mSpecConfig.outputQA) {
172 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
173 throw std::runtime_error("Need MC information to create QA plots");
174 }
175 if (!mSpecConfig.processMC) {
176 mConfig->configQA.noMC = true;
177 }
178 mConfig->configQA.shipToQC = true;
179 if (!mConfig->configProcessing.runQA) {
180 mConfig->configQA.enableLocalOutput = false;
181 mQATaskMask = (mSpecConfig.processMC ? gpudatatypes::gpuqa::tasksAllMC : gpudatatypes::gpuqa::tasksNone) | (mConfig->configQA.clusterRejectionHistograms ? gpudatatypes::gpuqa::taskClusterCounts : gpudatatypes::gpuqa::tasksNone);
182 mConfig->configProcessing.runQA = -mQATaskMask;
183 }
184 }
185 mConfig->configInterface.outputToExternalBuffers = true;
186 const bool runTracking = mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat;
187
188 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
189 if (runTracking) {
190 mConfig->configWorkflow.steps.set(gpudatatypes::RecoStep::TPCConversion,
193 mConfig->configWorkflow.outputs.set(gpudatatypes::InOutType::TPCMergedTracks);
194 }
195 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
196
197 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
198 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, true);
199 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, true);
200 }
201 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCClusters);
202 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
203 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCRaw);
204 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCClusterFinding, true);
205 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
206 }
207 if (mSpecConfig.decompressTPC) {
208 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, false);
209 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCDecompression, true);
210 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCCompressedClusters);
211 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
212 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, false);
213 if (mTPCSectorMask != 0xFFFFFFFFF) {
214 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
215 }
216 }
217 if (mSpecConfig.runTRDTracking) {
218 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::TRDTracklets, true);
219 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TRDTracking, true);
220 }
221 if (mSpecConfig.runITSTracking) {
222 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::ITSClusters, true);
223 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::ITSTracks, true);
224 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::ITSTracking, true);
225 }
226 if (mSpecConfig.outputSharedClusterMap) {
227 mConfig->configProcessing.outputSharedClusterMap = true;
228 }
229 if (!mSpecConfig.outputTracks) {
230 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
231 }
232 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
233
234 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
235 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
236 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
237 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
238 }
239 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
240 throw std::runtime_error("double pipeline requires exactly 2 threads");
241 } */
242 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
243 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
244 }
245
246 if (mSpecConfig.enableDoublePipeline != 2) {
247 mGPUReco = std::make_unique<GPUO2Interface>();
248
249 // initialize TPC calib objects
250 initFunctionTPCCalib(ic);
251
252 // mConfig->configCalib.buffer = mCalibObjects.mBuffer; // TODO WRONG
253 if (mConfig->configCalib.fastTransform == nullptr) {
254 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
255 }
256
257 if (mConfParam->matLUTFile.size()) {
258 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
259 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
260 if (mConfig->configCalib.matLUT == nullptr) {
261 LOGF(fatal, "Error loading matlut file");
262 }
263 } else {
264 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
265 }
266
267 if (mSpecConfig.readTRDtracklets) {
268 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
269 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
270
271 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
272 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
273 }
274
275 mConfig->configProcessing.willProvideO2PropagatorLate = true;
276 mConfig->configProcessing.o2PropagatorUseGPUField = true;
277 if (mConfig->configReconstruction.tpc.trackReferenceX == 1000.f) {
278 mConfig->configReconstruction.tpc.trackReferenceX = 83.f;
279 }
280
281 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
282 mConfig->configProcessing.printSettings = true;
283 if (mConfParam->printSettings > 1) {
284 mConfig->PrintParam();
285 }
286 }
287
288 // Configuration is prepared, initialize the tracker.
289 if (mGPUReco->Initialize(config) != 0) {
290 throw std::invalid_argument("GPU Reconstruction initialization failed");
291 }
292 if (mSpecConfig.outputQA) {
293 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
294 }
295 if (mSpecConfig.outputErrorQA) {
296 mGPUReco->setErrorCodeOutput(&mErrorQA);
297 }
298
299 // initialize ITS
300 if (mSpecConfig.runITSTracking) {
301 initFunctionITS(ic);
302 }
303 }
304
305 if (mSpecConfig.enableDoublePipeline) {
306 initPipeline(ic);
307 if (mConfParam->dump >= 2) {
308 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
309 }
310 }
311
312 auto& callbacks = ic.services().get<CallbackService>();
313 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
314 if (info.size == 0) {
315 return;
316 }
317 if (mSpecConfig.enableDoublePipeline) {
318 mRegionInfos.emplace_back(info);
319 }
320 if (mSpecConfig.enableDoublePipeline == 2) {
321 return;
322 }
323 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
324 return;
325 }
326 int32_t fd = 0;
327 if (mConfParam->mutexMemReg) {
328 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
329 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
330 if (fd == -1) {
331 throw std::runtime_error("Error opening memlock mutex lock file");
332 }
333 fchmod(fd, mask);
334 if (lockf(fd, F_LOCK, 0)) {
335 throw std::runtime_error("Error locking memlock mutex file");
336 }
337 }
338 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
339 if (mConfParam->benchmarkMemoryRegistration) {
340 start = std::chrono::high_resolution_clock::now();
341 }
342 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
343 throw std::runtime_error("Error registering memory for GPU");
344 }
345 if (mConfParam->benchmarkMemoryRegistration) {
346 end = std::chrono::high_resolution_clock::now();
347 std::chrono::duration<double> elapsed_seconds = end - start;
348 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
349 }
350 if (mConfParam->mutexMemReg) {
351 if (lockf(fd, F_ULOCK, 0)) {
352 throw std::runtime_error("Error unlocking memlock mutex file");
353 }
354 close(fd);
355 }
356 });
357
358 mTimer->Stop();
359 mTimer->Reset();
360}
361
363{
364 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
365 handlePipelineStop();
366}
367
369{
370 handlePipelineEndOfStream(ec);
371}
372
374{
375 if (mSpecConfig.enableDoublePipeline != 2) {
376 finaliseCCDBTPC(matcher, obj);
377 if (mSpecConfig.runITSTracking) {
378 finaliseCCDBITS(matcher, obj);
379 }
380 }
381 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
382 mGRPGeomUpdated = true;
383 return;
384 }
385}
386
387template <class D, class E, class F, class G, class H, class I, class J, class K>
388void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
389{
390 if (mSpecConfig.enableDoublePipeline == 1) {
391 return;
392 }
393 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
394 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
395
396 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
397 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
398 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
399 tpcZSmeta.Pointers[i][j].clear();
400 tpcZSmeta.Sizes[i][j].clear();
401 }
402 }
403 }
404 if (mSpecConfig.zsOnTheFly) {
405 tpcZSonTheFlySizes = {0};
406 // tpcZSonTheFlySizes: #zs pages per endpoint:
407 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
408 bool recv = false, recvsizes = false;
409 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
410 if (recvsizes) {
411 throw std::runtime_error("Received multiple ZSSIZES data");
412 }
413 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
414 recvsizes = true;
415 }
416 // zs pages
417 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
418 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
419 if (recv) {
420 throw std::runtime_error("Received multiple TPCZS data");
421 }
422 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
423 recv = true;
424 }
425 if (!recv || !recvsizes) {
426 throw std::runtime_error("TPC ZS on the fly data not received");
427 }
428
429 uint32_t offset = 0;
430 for (uint32_t i = 0; i < NSectors; i++) {
431 uint32_t pageSector = 0;
432 for (uint32_t j = 0; j < NEndpoints; j++) {
433 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
434 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
435 }
436 if (mVerbosity >= 1) {
437 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
438 }
439 }
440 }
441 if (mSpecConfig.zsDecoder) {
442 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
443 auto isSameRdh = [](const char* left, const char* right) -> bool {
444 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
445 };
446 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
447 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
448 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
449 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
450 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
451 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
452 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
453 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
454 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
455 };
456 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
457 if (checkForZSData(ptr, subSpec)) {
458 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
459 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
460 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
461 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
462 }
463 };
464 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
465 debugTFDump = true;
466 static uint32_t nErrors = 0;
467 nErrors++;
468 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
469 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
470 }
471 }
472
473 int32_t totalCount = 0;
474 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
475 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
476 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
477 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
478 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
479 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
480 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
481 totalCount += tpcZSmeta.Pointers[i][j].size();
482 }
483 }
484 } else if (mSpecConfig.decompressTPC) {
485 if (mSpecConfig.decompressTPCFromROOT) {
486 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
487 compClustersFlatDummy.setForward(&compClustersDummy);
488 pCompClustersFlat = &compClustersFlatDummy;
489 } else {
490 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
491 }
492 if (pCompClustersFlat == nullptr) {
493 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
494 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
495 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
496 }
497 } else if (!mSpecConfig.zsOnTheFly) {
498 if (mVerbosity) {
499 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
500 }
501 }
502}
503
504int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
505{
506 int32_t retVal = 0;
507 if (mConfParam->dump < 2) {
508 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
509
510 if (retVal == 0 && mSpecConfig.runITSTracking) {
511 retVal = runITSTracking(*pc);
512 }
513 }
514
515 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
516 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
517 }
518 return retVal;
519}
520
521void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
522{
523 if (mOldCalibObjects.size() > 0) {
524 mOldCalibObjects.pop();
525 }
526 mOldCalibObjects.emplace(std::move(oldCalibObjects));
527}
528
530{
531 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
532 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
533
534 auto cput = mTimer->CpuTime();
535 auto realt = mTimer->RealTime();
536 mTimer->Start(false);
537 mNTFs++;
538
539 std::vector<gsl::span<const char>> inputs;
540
541 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
542 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
543 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
544 o2::tpc::CompressedClusters compClustersDummy;
547 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
548 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
549 std::unique_ptr<char[]> tmpEmptyCompClusters;
550
551 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
552 bool debugTFDump = false;
553
554 if (mSpecConfig.processMC) {
555 getWorkflowTPCInput_mc = true;
556 }
557 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
558 getWorkflowTPCInput_clusters = true;
559 }
560 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
561 getWorkflowTPCInput_digits = true;
562 }
563
564 // ------------------------------ Handle inputs ------------------------------
565
566 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
567
569 if (mSpecConfig.enableDoublePipeline != 2) {
570 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
571 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
572 }
573 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
574 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
575 }
576 }
577
579 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
580 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
581
582 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
583 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
584 mTFSettings->hasTfStartOrbit = 1;
585 mTFSettings->hasNHBFPerTF = 1;
586 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
587 mTFSettings->hasRunStartOrbit = 0;
588 ptrs.settingsTF = mTFSettings.get();
589
590 if (mSpecConfig.enableDoublePipeline != 2) {
591 if (mVerbosity) {
592 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
593 }
594 if (mConfParam->checkFirstTfOrbit) {
595 static uint32_t lastFirstTFOrbit = -1;
596 static uint32_t lastTFCounter = -1;
597 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
598 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
599 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
600 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
601 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
602 }
603 }
604 lastFirstTFOrbit = tinfo.firstTForbit;
605 lastTFCounter = tinfo.tfCounter;
606 }
607 }
608
610 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
611 if (mSpecConfig.readTRDtracklets) {
612 o2::globaltracking::DataRequest dataRequestTRD;
614 inputTracksTRD.collectData(pc, dataRequestTRD);
615 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
616 }
617
618 void* ptrEp[NSectors * NEndpoints] = {};
619 bool doInputDigits = false, doInputDigitsMC = false;
620 if (mSpecConfig.decompressTPC) {
621 ptrs.tpcCompressedClusters = pCompClustersFlat;
622 } else if (mSpecConfig.zsOnTheFly) {
623 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
624 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
625 ptrs.tpcZS = &tpcZS;
626 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
627 } else if (mSpecConfig.zsDecoder) {
628 ptrs.tpcZS = &tpcZS;
629 if (mSpecConfig.processMC) {
630 throw std::runtime_error("Cannot process MC information, none available");
631 }
632 } else if (mSpecConfig.caClusterer) {
633 doInputDigits = true;
634 doInputDigitsMC = mSpecConfig.processMC;
635 } else {
636 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
637 }
638
639 if (mTPCSectorMask != 0xFFFFFFFFF) {
640 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
641 for (uint32_t i = 0; i < NSectors; i++) {
642 if (!(mTPCSectorMask & (1ul << i))) {
643 if (ptrs.tpcZS) {
644 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
645 tpcZS.sector[i].zsPtr[j] = nullptr;
646 tpcZS.sector[i].nZSPtr[j] = nullptr;
647 tpcZS.sector[i].count[j] = 0;
648 }
649 }
650 }
651 }
652 }
653
654 GPUTrackingInOutDigits tpcDigitsMap;
655 GPUTPCDigitsMCInput tpcDigitsMapMC;
656 if (doInputDigits) {
657 ptrs.tpcPackedDigits = &tpcDigitsMap;
658 if (doInputDigitsMC) {
659 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
660 }
661 for (uint32_t i = 0; i < NSectors; i++) {
662 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
663 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
664 if (doInputDigitsMC) {
665 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
666 }
667 }
668 }
669
670 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
671 if (mClusterOutputIds.size() > 0) {
672 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
673 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
674 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
675 }
676
677 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
678
679 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
680 if (mSpecConfig.enableDoublePipeline) {
681 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
682 return;
683 }
684 }
685
686 // ------------------------------ Prepare outputs ------------------------------
687
688 GPUInterfaceOutputs outputRegions;
689 using outputDataType = char;
690 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
691 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
692 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
693 std::unordered_set<std::string> outputsCreated;
694
695 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
696 if (condition) {
697 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
698 if (mConfParam->allocateOutputOnTheFly) {
699 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
700 size += offset;
701 if (mVerbosity) {
702 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
703 }
704 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
705 if (mVerbosity) {
706 start = std::chrono::high_resolution_clock::now();
707 }
708 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
709 outputsCreated.insert(name);
710 if (mVerbosity) {
711 end = std::chrono::high_resolution_clock::now();
712 std::chrono::duration<double> elapsed_seconds = end - start;
713 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
714 << ": " << elapsed_seconds.count() << "s";
715 }
716 return (buffer.second = buffer.first->get().data()) + offset;
717 };
718 } else {
719 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
720 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
721 region.size = buffer.first->get().size() - offset;
722 outputsCreated.insert(name);
723 }
724 }
725 };
726
727 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
728 if (!buffer.first) {
729 return;
730 }
731 if (buffer.first->get().size() < size) {
732 throw std::runtime_error("Invalid buffer size requested");
733 }
734 buffer.first->get().resize(size);
735 if (size && buffer.first->get().data() != buffer.second) {
736 throw std::runtime_error("Inconsistent buffer address after downsize");
737 }
738 };
739
740 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
741 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
742 downSizeBuffer(buffer, size);
743 };*/
744
745 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
746 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
747 if (!buffer.first) {
748 return;
749 }
750 if (span.size() && buffer.second != (char*)span.data()) {
751 throw std::runtime_error("Buffer does not match span");
752 }
753 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
754 };
755
756 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
757 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
758 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
759 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
760 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
761 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
762 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
763 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
765 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
766 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
767 }
768
769 // ------------------------------ Actual processing ------------------------------
770
771 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
772 throw std::runtime_error("Invalid input for gpu tracking");
773 }
774
775 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
776
777 calibObjectStruct oldCalibObjects;
778 doCalibUpdates(pc, oldCalibObjects);
779
780 lockDecodeInput.reset();
781
782 uint32_t threadIndex;
783 if (mConfParam->dump) {
784 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
785 while (pipelineContext->jobThreadIndex == -1) {
786 }
787 threadIndex = pipelineContext->jobThreadIndex;
788 } else {
789 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
790 }
791
792 std::string dir = "";
793 if (mConfParam->dumpFolder != "") {
794 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
795 if (mNTFs == 1) {
796 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
797 }
798 dir += "/";
799 }
800 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
801 mGPUReco->DumpSettings(threadIndex, dir.c_str());
802 }
803 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
804 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
805 mNTFDumps++;
806 }
807 }
808 if (mNTFs == 1 && pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0) { // TPC ConfigurableCarams are somewhat special, need to construct by hand
809 o2::conf::ConfigurableParam::write(o2::base::NameConf::getConfigOutputFileName(pc.services().get<const o2::framework::DeviceSpec>().name, "rec_tpc"), "GPU_rec_tpc,GPU_rec,GPU_proc_param,GPU_proc,GPU_global,trackTuneParams");
810 }
811
812 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
813 if (mConfParam->dumpBadTFMode == 2) {
814 ptrsDump.reset(new GPUTrackingInOutPointers);
815 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
816 }
817
818 int32_t retVal = 0;
819 if (mSpecConfig.enableDoublePipeline) {
820 if (!pipelineContext->jobSubmitted) {
821 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
822 } else {
823 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
824 }
825 std::unique_lock lk(pipelineContext->jobFinishedMutex);
826 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
827 retVal = pipelineContext->jobReturnValue;
828 threadIndex = pipelineContext->jobThreadIndex;
829 } else {
830 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
831 threadIndex = mNextThreadIndex;
832 if (mConfig->configProcessing.doublePipeline) {
833 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
834 }
835
836 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
837 }
838 if (retVal != 0) {
839 debugTFDump = true;
840 }
841 cleanOldCalibsTPCPtrs(oldCalibObjects);
842
843 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
844
845 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
846 mNDebugDumps++;
847 if (mConfParam->dumpBadTFMode <= 1) {
848 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
849 FILE* fp = fopen(filename.c_str(), "w+b");
850 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
851 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
852 auto data = pc.inputs().get<gsl::span<char>>(ref);
853 if (mConfParam->dumpBadTFMode == 1) {
854 uint64_t size = data.size();
855 fwrite(&size, 1, sizeof(size), fp);
856 }
857 fwrite(data.data(), 1, data.size(), fp);
858 }
859 fclose(fp);
860 } else if (mConfParam->dumpBadTFMode == 2) {
861 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
862 }
863 }
864
865 if (mConfParam->dump == 2) {
866 return;
867 }
868
869 // ------------------------------ Varios postprocessing steps ------------------------------
870
871 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
873 }
874 bool createEmptyOutput = false;
875 if (retVal != 0) {
876 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
877 if (mConfig->configProcessing.throttleAlarms) {
878 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
879 } else {
880 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
881 }
882 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
883 } else {
884 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
885 }
886 }
887
888 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
889 if (createEmptyOutput) {
890 memset(&ptrs, 0, sizeof(ptrs));
891 for (uint32_t i = 0; i < outputRegions.count(); i++) {
892 if (outputBuffers[i].first) {
893 size_t toSize = 0;
894 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
895 toSize = sizeof(*ptrs.tpcCompressedClusters);
896 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
897 toSize = sizeof(o2::tpc::ClusterCountIndex);
898 }
899 outputBuffers[i].first->get().resize(toSize);
900 outputBuffers[i].second = outputBuffers[i].first->get().data();
901 if (toSize) {
902 memset(outputBuffers[i].second, 0, toSize);
903 }
904 }
905 }
906 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
907 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
908 ptrs.clustersNative = tmpEmptyClNative.get();
909 if (mSpecConfig.processMC) {
910 MCLabelContainer cont;
911 cont.flatten_to(clustersMCBuffer.first);
912 clustersMCBuffer.second = clustersMCBuffer.first;
913 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
914 }
915 } else {
916 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
917 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
918 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
919 if (!mConfParam->allocateOutputOnTheFly) {
920 for (uint32_t i = 0; i < outputRegions.count(); i++) {
921 if (outputRegions.asArray()[i].ptrBase) {
922 if (outputRegions.asArray()[i].size == 1) {
923 throw std::runtime_error("Preallocated buffer size exceeded");
924 }
925 outputRegions.asArray()[i].checkCurrent();
926 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
927 }
928 }
929 }
930 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
931 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
932 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
933
934 // if requested, tune TPC tracks
935 if (ptrs.nOutputTracksTPCO2) {
936 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
937 }
938
939 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
940 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
941 }
942 }
943
944 if (mConfig->configWorkflow.outputs.isSet(gpudatatypes::InOutType::TPCMergedTracks)) {
945 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
946 }
947
948 if (mSpecConfig.outputCompClustersRoot) {
951 }
952
953 if (mClusterOutputIds.size() > 0) {
954 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
955 if (mSpecConfig.sendClustersPerSector) {
956 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
957 for (uint32_t i = 0; i < NSectors; i++) {
958 if (mTPCSectorMask & (1ul << i)) {
960 clusterOutputSectorHeader.sectorBits = (1ul << i);
961 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
962 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
963 memset(outIndex, 0, sizeof(*outIndex));
964 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
965 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
966 }
967 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
968 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
969 MCLabelContainer cont;
970 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
971 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
972 for (const auto& label : labels) {
973 cont.addElement(j, label);
974 }
975 }
976 ConstMCLabelContainer contflat;
977 cont.flatten_to(contflat);
978 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
979 }
980 }
981 }
982 } else {
983 // Clusters are shipped as single message, fill ClusterCountIndex
984 DataHeader::SubSpecificationType subspec = NSectors;
985 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
986 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
987 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
988 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
989 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
990 }
991 }
992 }
993 if (mSpecConfig.outputQA) {
994 TObjArray out;
995 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
996 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
997 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
998 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
999 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
1000 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1001 if (sendQAOutput) {
1002 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1003 }
1004 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1005 if (sendQAOutput) {
1006 mQA->cleanup();
1007 }
1008 }
1009 if (mSpecConfig.outputErrorQA) {
1010 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1011 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1012 }
1013 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1015 }
1016 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1018 }
1019 mTimer->Stop();
1020 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1021}
1022
1023void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1024{
1025 GPUCalibObjectsConst newCalibObjects;
1026 GPUNewCalibValues newCalibValues;
1027 // check for updates of TPC calibration objects
1028 bool needCalibUpdate = false;
1029 if (mGRPGeomUpdated) {
1030 mGRPGeomUpdated = false;
1031 needCalibUpdate = true;
1032
1033 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1036 mITSGeometryCreated = true;
1037 }
1038
1039 if (mAutoSolenoidBz) {
1040 newCalibValues.newSolenoidField = true;
1041 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1042 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1043 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1044 }
1045 if (mAutoContinuousMaxTimeBin) {
1046 newCalibValues.newContinuousMaxTimeBin = true;
1047 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1048 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1049 }
1050
1051 if (!mPropagatorInstanceCreated) {
1052 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1053 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1054 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1055 }
1056 mPropagatorInstanceCreated = true;
1057 }
1058
1059 if (!mMatLUTCreated) {
1060 if (mConfParam->matLUTFile.size() == 0) {
1061 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1062 LOG(info) << "Loaded material budget lookup table";
1063 }
1064 mMatLUTCreated = true;
1065 }
1066 if (mSpecConfig.readTRDtracklets) {
1067 if (!mTRDGeometryCreated) {
1068 auto gm = o2::trd::Geometry::instance();
1069 gm->createPadPlaneArray();
1070 gm->createClusterMatrixArray();
1071 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1072 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1073 LOG(info) << "Loaded TRD geometry";
1074 mTRDGeometryCreated = true;
1075 }
1076 if (!mTRDRecoParamCreated) {
1077 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1078 newCalibObjects.trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1079 mTRDRecoParamCreated = true;
1080 }
1081 }
1082 }
1083 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1084 if (mSpecConfig.runITSTracking) {
1085 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1086 }
1087 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1088 newCalibValues.newTPCTimeBinCut = true;
1089 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1090 needCalibUpdate = true;
1091 }
1092 if (mSpecConfig.nnLoadFromCCDB) {
1093 auto dumpToFile = [](const char* buffer, std::size_t validSize, const std::string& path) {
1094 std::ofstream out(path, std::ios::binary | std::ios::trunc);
1095 if (!out.is_open()) {
1096 throw std::runtime_error("Failed to open output file: " + path);
1097 }
1098
1099 out.write(buffer, static_cast<std::streamsize>(validSize));
1100 if (!out) {
1101 throw std::runtime_error("Failed while writing data to: " + path);
1102 }
1103 };
1104 for (int i = 0; i < 3; i++) {
1105 newCalibObjects.nnClusterizerNetworks[i] = mConfig->configCalib.nnClusterizerNetworks[i];
1106 if (mSpecConfig.nnDumpToFile && newCalibObjects.nnClusterizerNetworks[i]) {
1107 std::string path = "tpc_nn_clusterizer_" + std::to_string(i) + ".onnx";
1108 dumpToFile(newCalibObjects.nnClusterizerNetworks[i]->getONNXModel(), newCalibObjects.nnClusterizerNetworks[i]->getONNXModelSize(), path);
1109 LOG(info) << "Dumped TPC clusterizer NN " << i << " to file " << path;
1110 }
1111 }
1112 }
1113 if (needCalibUpdate) {
1114 LOG(info) << "Updating GPUReconstruction calibration objects";
1115 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1116 }
1117}
1118
1120{
1121 Options opts;
1122 if (mSpecConfig.enableDoublePipeline) {
1123 bool send = mSpecConfig.enableDoublePipeline == 2;
1124 char* o2jobid = getenv("O2JOBID");
1125 char* numaid = getenv("NUMAID");
1126 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1127 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1128 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1129 }
1130 if (mSpecConfig.enableDoublePipeline == 2) {
1131 return opts;
1132 }
1133 return opts;
1134}
1135
1137{
1138 Inputs inputs;
1139 if (mSpecConfig.zsDecoder) {
1140 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1141 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1142 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1143 if (mSpecConfig.askDISTSTF) {
1144 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1145 }
1146 }
1147 if (mSpecConfig.enableDoublePipeline == 2) {
1148 if (!mSpecConfig.zsDecoder) {
1149 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1150 }
1151 return inputs;
1152 } else if (mSpecConfig.enableDoublePipeline == 1) {
1153 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1154 }
1155 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1156 // calibration objects for TPC clusterization
1157 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1158 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1159 }
1160 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1161 // calibration objects for TPC tracking
1162 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1163 if (mapSources != 0) {
1164 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1166 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1167 }
1169 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1170 }
1171 }
1172
1173 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1174 if (mSpecConfig.tpcUseMCTimeGain) {
1175 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1176 } else {
1177 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1178 }
1179 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1180 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1182 inputs.emplace_back("corrMap", o2::header::gDataOriginTPC, "TPCCORRMAP", 0, Lifetime::Timeframe);
1183 if (mSpecConfig.enableCTPLumi) {
1184 inputs.emplace_back("lumiCTP", o2::header::gDataOriginCTP, "LUMICTP", 0, Lifetime::Timeframe);
1185 }
1186 }
1187 if (mSpecConfig.decompressTPC) {
1188 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1189 } else if (mSpecConfig.caClusterer) {
1190 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1191 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1192 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1193 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1194 }
1195 } else if (mSpecConfig.runTPCTracking) {
1196 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1197 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1198 }
1199 if (mSpecConfig.processMC) {
1200 if (mSpecConfig.caClusterer) {
1201 if (!mSpecConfig.zsDecoder) {
1202 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1203 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1204 }
1205 } else if (mSpecConfig.runTPCTracking) {
1206 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1207 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1208 }
1209 }
1210
1211 if (mSpecConfig.zsOnTheFly) {
1212 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1213 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1214 }
1215 if (mSpecConfig.readTRDtracklets) {
1216 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1217 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1218 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1219 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1220 }
1221
1222 if (mSpecConfig.runITSTracking) {
1223 for (unsigned int iLay{0}; iLay < (mSpecConfig.itsStaggered ? 7 : 1); ++iLay) {
1224 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", iLay, Lifetime::Timeframe);
1225 inputs.emplace_back("patterns", "ITS", "PATTERNS", iLay, Lifetime::Timeframe);
1226 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", iLay, Lifetime::Timeframe);
1227 if (mSpecConfig.processMC) {
1228 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", iLay, Lifetime::Timeframe);
1229 }
1230 }
1231 if (mSpecConfig.itsTriggerType == 1) {
1232 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1233 } else if (mSpecConfig.itsTriggerType == 2) {
1234 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1235 }
1236 if (mSpecConfig.enableDoublePipeline != 2) {
1237 if (mSpecConfig.isITS3) {
1238 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1239 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1240 } else {
1241 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1242 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1243 }
1244 if (mSpecConfig.itsOverrBeamEst) {
1245 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1246 }
1247 }
1248 }
1249
1250 // NN clusterizer
1251 *mConfParam = mConfig->ReadConfigurableParam();
1252 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1253
1254 LOG(info) << "(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1255 mSpecConfig.nnLoadFromCCDB = true;
1256 mSpecConfig.nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1257 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1258
1259 std::map<std::string, std::string> metadata;
1260 metadata["inputDType"] = nnClusterizerSettings.nnInferenceInputDType; // FP16 or FP32
1261 metadata["outputDType"] = nnClusterizerSettings.nnInferenceOutputDType; // FP16 or FP32
1262 metadata["nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum; // 0, 1 -> Only for regression model
1263 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType; // FC, CNN
1264 metadata["nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate; // in kHz
1265 metadata["nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType; // pp, pPb, PbPb
1266
1267 auto convert_map_to_metadata = [](const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1268 for (const auto& [key, value] : inputMap) {
1269 if (value != "") {
1270 outputMetadata.push_back({key, value});
1271 }
1272 }
1273 };
1274
1275 mSpecConfig.nnEvalMode = o2::utils::Str::tokenize(nnClusterizerSettings.nnEvalMode, ':');
1276 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1277
1278 if (mConfParam->printSettings) {
1279 auto printSettings = [](const std::map<std::string, std::string>& settings) {
1280 LOG(info) << "(NN CLUS) NN Clusterizer CCDB settings:";
1281 for (const auto& [key, value] : settings) {
1282 LOG(info) << " " << key << " : " << value;
1283 }
1284 };
1285 printSettings(metadata);
1286 }
1287
1288 if (mSpecConfig.nnEvalMode[0] == "c1") {
1289 metadata["nnCCDBEvalType"] = "classification_c1";
1290 convert_map_to_metadata(metadata, ccdb_metadata);
1291 inputs.emplace_back("nn_classification_c1", gDataOriginTPC, "NNCLUSTERIZER_C1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1292 } else if (mSpecConfig.nnEvalMode[0] == "c2") {
1293 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1294 metadata["nnCCDBEvalType"] = "classification_c2";
1295 convert_map_to_metadata(metadata, ccdb_metadata);
1296 inputs.emplace_back("nn_classification_c2", gDataOriginTPC, "NNCLUSTERIZER_C2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1297 }
1298
1299 metadata["nnCCDBEvalType"] = "regression_c1";
1300 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1301 convert_map_to_metadata(metadata, ccdb_metadata);
1302 inputs.emplace_back("nn_regression_c1", gDataOriginTPC, "NNCLUSTERIZER_R1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1303
1304 if (mSpecConfig.nnEvalMode[1] == "r2") {
1305 metadata["nnCCDBEvalType"] = "regression_c2";
1306 convert_map_to_metadata(metadata, ccdb_metadata);
1307 inputs.emplace_back("nn_regression_c2", gDataOriginTPC, "NNCLUSTERIZER_R2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1308 }
1309 }
1310
1311 return inputs;
1312};
1313
1315{
1316 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1317 std::vector<OutputSpec> outputSpecs;
1318 if (mSpecConfig.enableDoublePipeline == 2) {
1319 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1320 return outputSpecs;
1321 }
1322 if (mSpecConfig.outputTracks) {
1323 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1324 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1325 }
1326 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1327 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1328 }
1329 if (mSpecConfig.outputCompClustersRoot) {
1330 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1331 }
1332 if (mSpecConfig.outputCompClustersFlat) {
1333 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1334 }
1335 if (mSpecConfig.outputCAClusters) {
1336 for (auto const& sector : mTPCSectors) {
1337 mClusterOutputIds.emplace_back(sector);
1338 }
1339 if (mSpecConfig.sendClustersPerSector) {
1340 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1341 for (const auto sector : mTPCSectors) {
1342 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1343 }
1344 } else {
1345 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1346 }
1347 if (mSpecConfig.processMC) {
1348 if (mSpecConfig.sendClustersPerSector) {
1349 for (const auto sector : mTPCSectors) {
1350 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1351 }
1352 } else {
1353 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1354 }
1355 }
1356 }
1357 if (mSpecConfig.outputSharedClusterMap) {
1358 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1359 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1360 }
1361 if (mSpecConfig.tpcTriggerHandling) {
1362 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1363 }
1364 if (mSpecConfig.outputQA) {
1365 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1366 }
1367 if (mSpecConfig.outputErrorQA) {
1368 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1369 }
1370
1371 if (mSpecConfig.runITSTracking) {
1372 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1377 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1378
1379 if (mSpecConfig.processMC) {
1380 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1381 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1383 }
1384 }
1385
1386 return outputSpecs;
1387};
1388
1390{
1391 ExitPipeline();
1392 mQA.reset(nullptr);
1393 mDisplayFrontend.reset(nullptr);
1394 mGPUReco.reset(nullptr);
1395}
1396
1397} // namespace o2::gpu
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
StringRef key
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
static std::string getConfigOutputFileName(const std::string &procName, const std::string &confName="", bool json=true)
Definition NameConf.cxx:120
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:178
static void write(std::string const &filename, std::string const &keyOnly="")
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
A read-only version of MCTruthContainer allowing for storage optimisation.
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginCTP
Definition DataHeader.h:564
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:593
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:98
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:622
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"