Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
50#include "TPCBase/RDHUtils.h"
52#include "GPUO2InterfaceQA.h"
53#include "GPUO2Interface.h"
54#include "GPUO2InterfaceUtils.h"
55#include "CalibdEdxContainer.h"
56#include "ORTRootSerializer.h"
57#include "GPUNewCalibValues.h"
58#include "TPCPadGainCalib.h"
59#include "TPCZSLinkMapping.h"
61#include "TPCBase/Sector.h"
62#include "TPCBase/Utils.h"
68#include "Algorithm/Parser.h"
71#include "TRDBase/Geometry.h"
73#include "GPUTRDRecoParam.h"
79#include "GPUWorkflowInternal.h"
80#include "GPUDataTypesQA.h"
81// #include "Framework/ThreadPool.h"
82
83#include <TStopwatch.h>
84#include <TObjArray.h>
85#include <TH1F.h>
86#include <TH2F.h>
87#include <TH1D.h>
88#include <TGraphAsymmErrors.h>
89
90#include <filesystem>
91#include <memory>
92#include <vector>
93#include <iomanip>
94#include <stdexcept>
95#include <regex>
96#include <sys/types.h>
97#include <sys/stat.h>
98#include <fcntl.h>
99#include <chrono>
100#include <unordered_set>
101
102using namespace o2::framework;
103using namespace o2::header;
104using namespace o2::gpu;
105using namespace o2::base;
106using namespace o2::dataformats;
108
109namespace o2::gpu
110{
111
112GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
113{
114 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
115 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
116 }
117
118 mConfig.reset(new GPUO2InterfaceConfiguration);
119 mConfParam.reset(new GPUSettingsO2);
120 mTFSettings.reset(new GPUSettingsTF);
121 mTimer.reset(new TStopwatch);
122 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
123
124 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
125 *gPolicyOrder = &mPolicyOrder;
126 }
127}
128
130
132{
134 GPUO2InterfaceConfiguration& config = *mConfig.get();
135
136 // Create configuration object and fill settings
137 mConfig->configGRP.solenoidBzNominalGPU = 0;
138 mTFSettings->hasSimStartOrbit = 1;
139 auto& hbfu = o2::raw::HBFUtils::Instance();
140 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
141
142 *mConfParam = mConfig->ReadConfigurableParam();
143
144 if (mConfParam->display) {
145 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
146 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
147 if (mConfig->configProcessing.eventDisplay != nullptr) {
148 LOG(info) << "Event display enabled";
149 } else {
150 throw std::runtime_error("GPU Event Display frontend could not be created!");
151 }
152 }
153 if (mSpecConfig.enableDoublePipeline) {
154 mConfig->configProcessing.doublePipeline = 1;
155 }
156
157 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
158 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
159 if (mAutoContinuousMaxTimeBin) {
160 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
161 }
162 if (mConfig->configProcessing.deviceNum == -2) {
163 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
164 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
165 mConfig->configProcessing.deviceNum = myId;
166 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
167 }
168 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
169 mVerbosity = 1;
170 }
171 mConfig->configProcessing.runMC = mSpecConfig.processMC;
172 if (mSpecConfig.outputQA) {
173 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
174 throw std::runtime_error("Need MC information to create QA plots");
175 }
176 if (!mSpecConfig.processMC) {
177 mConfig->configQA.noMC = true;
178 }
179 mConfig->configQA.shipToQC = true;
180 if (!mConfig->configProcessing.runQA) {
181 mConfig->configQA.enableLocalOutput = false;
182 mQATaskMask = (mSpecConfig.processMC ? gpudatatypes::gpuqa::tasksAllMC : gpudatatypes::gpuqa::tasksNone) | (mConfig->configQA.clusterRejectionHistograms ? gpudatatypes::gpuqa::taskClusterCounts : gpudatatypes::gpuqa::tasksNone);
183 mConfig->configProcessing.runQA = -mQATaskMask;
184 }
185 }
186 mConfig->configInterface.outputToExternalBuffers = true;
187 const bool runTracking = mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat;
188
189 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
190 if (runTracking) {
191 mConfig->configWorkflow.steps.set(gpudatatypes::RecoStep::TPCConversion,
194 mConfig->configWorkflow.outputs.set(gpudatatypes::InOutType::TPCMergedTracks);
195 }
196 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
197
198 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
199 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, true);
200 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, true);
201 }
202 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCClusters);
203 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
204 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCRaw);
205 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCClusterFinding, true);
206 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
207 }
208 if (mSpecConfig.decompressTPC) {
209 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, false);
210 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCDecompression, true);
211 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCCompressedClusters);
212 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
213 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, false);
214 if (mTPCSectorMask != 0xFFFFFFFFF) {
215 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
216 }
217 }
218 if (mSpecConfig.runTRDTracking) {
219 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::TRDTracklets, true);
220 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TRDTracking, true);
221 }
222 if (mSpecConfig.runITSTracking) {
223 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::ITSClusters, true);
224 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::ITSTracks, true);
225 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::ITSTracking, true);
226 }
227 if (mSpecConfig.outputSharedClusterMap) {
228 mConfig->configProcessing.outputSharedClusterMap = true;
229 }
230 if (!mSpecConfig.outputTracks) {
231 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
232 }
233 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
234
235 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
236 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
237 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
238 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
239 }
240 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
241 throw std::runtime_error("double pipeline requires exactly 2 threads");
242 } */
243 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
244 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
245 }
246
247 if (mSpecConfig.enableDoublePipeline != 2) {
248 mGPUReco = std::make_unique<GPUO2Interface>();
249
250 // initialize TPC calib objects
251 initFunctionTPCCalib(ic);
252
253 // mConfig->configCalib.buffer = mCalibObjects.mBuffer; // TODO WRONG
254 if (mConfig->configCalib.fastTransform == nullptr) {
255 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
256 }
257
258 if (mConfParam->matLUTFile.size()) {
259 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
260 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
261 if (mConfig->configCalib.matLUT == nullptr) {
262 LOGF(fatal, "Error loading matlut file");
263 }
264 } else {
265 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
266 }
267
268 if (mSpecConfig.readTRDtracklets) {
269 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
270 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
271
272 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
273 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
274 }
275
276 mConfig->configProcessing.willProvideO2PropagatorLate = true;
277 mConfig->configProcessing.o2PropagatorUseGPUField = true;
278 if (mConfig->configReconstruction.tpc.trackReferenceX == 1000.f) {
279 mConfig->configReconstruction.tpc.trackReferenceX = 83.f;
280 }
281
282 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
283 mConfig->configProcessing.printSettings = true;
284 if (mConfParam->printSettings > 1) {
285 mConfig->PrintParam();
286 }
287 }
288
289 // Configuration is prepared, initialize the tracker.
290 if (mGPUReco->Initialize(config) != 0) {
291 throw std::invalid_argument("GPU Reconstruction initialization failed");
292 }
293 if (mSpecConfig.outputQA) {
294 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
295 }
296 if (mSpecConfig.outputErrorQA) {
297 mGPUReco->setErrorCodeOutput(&mErrorQA);
298 }
299
300 // initialize ITS
301 if (mSpecConfig.runITSTracking) {
302 initFunctionITS(ic);
303 }
304 }
305
306 if (mSpecConfig.enableDoublePipeline) {
307 initPipeline(ic);
308 if (mConfParam->dump >= 2) {
309 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
310 }
311 }
312
313 auto& callbacks = ic.services().get<CallbackService>();
314 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
315 if (info.size == 0) {
316 return;
317 }
318 if (mSpecConfig.enableDoublePipeline) {
319 mRegionInfos.emplace_back(info);
320 }
321 if (mSpecConfig.enableDoublePipeline == 2) {
322 return;
323 }
324 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
325 return;
326 }
327 int32_t fd = 0;
328 if (mConfParam->mutexMemReg) {
329 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
330 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
331 if (fd == -1) {
332 throw std::runtime_error("Error opening memlock mutex lock file");
333 }
334 fchmod(fd, mask);
335 if (lockf(fd, F_LOCK, 0)) {
336 throw std::runtime_error("Error locking memlock mutex file");
337 }
338 }
339 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
340 if (mConfParam->benchmarkMemoryRegistration) {
341 start = std::chrono::high_resolution_clock::now();
342 }
343 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
344 throw std::runtime_error("Error registering memory for GPU");
345 }
346 if (mConfParam->benchmarkMemoryRegistration) {
347 end = std::chrono::high_resolution_clock::now();
348 std::chrono::duration<double> elapsed_seconds = end - start;
349 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
350 }
351 if (mConfParam->mutexMemReg) {
352 if (lockf(fd, F_ULOCK, 0)) {
353 throw std::runtime_error("Error unlocking memlock mutex file");
354 }
355 close(fd);
356 }
357 });
358
359 mTimer->Stop();
360 mTimer->Reset();
361}
362
364{
365 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
366 handlePipelineStop();
367}
368
370{
371 handlePipelineEndOfStream(ec);
372}
373
375{
376 if (mSpecConfig.enableDoublePipeline != 2) {
377 finaliseCCDBTPC(matcher, obj);
378 if (mSpecConfig.runITSTracking) {
379 finaliseCCDBITS(matcher, obj);
380 }
381 }
382 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
383 mGRPGeomUpdated = true;
384 return;
385 }
386}
387
388template <class D, class E, class F, class G, class H, class I, class J, class K>
389void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
390{
391 if (mSpecConfig.enableDoublePipeline == 1) {
392 return;
393 }
394 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
395 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
396
397 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
398 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
399 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
400 tpcZSmeta.Pointers[i][j].clear();
401 tpcZSmeta.Sizes[i][j].clear();
402 }
403 }
404 }
405 if (mSpecConfig.zsOnTheFly) {
406 tpcZSonTheFlySizes = {0};
407 // tpcZSonTheFlySizes: #zs pages per endpoint:
408 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
409 bool recv = false, recvsizes = false;
410 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
411 if (recvsizes) {
412 throw std::runtime_error("Received multiple ZSSIZES data");
413 }
414 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
415 recvsizes = true;
416 }
417 // zs pages
418 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
419 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
420 if (recv) {
421 throw std::runtime_error("Received multiple TPCZS data");
422 }
423 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
424 recv = true;
425 }
426 if (!recv || !recvsizes) {
427 throw std::runtime_error("TPC ZS on the fly data not received");
428 }
429
430 uint32_t offset = 0;
431 for (uint32_t i = 0; i < NSectors; i++) {
432 uint32_t pageSector = 0;
433 for (uint32_t j = 0; j < NEndpoints; j++) {
434 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
435 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
436 }
437 if (mVerbosity >= 1) {
438 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
439 }
440 }
441 }
442 if (mSpecConfig.zsDecoder) {
443 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
444 auto isSameRdh = [](const char* left, const char* right) -> bool {
445 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
446 };
447 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
448 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
449 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
450 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
451 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
452 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
453 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
454 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
455 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
456 };
457 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
458 if (checkForZSData(ptr, subSpec)) {
459 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
460 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
461 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
462 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
463 }
464 };
465 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
466 debugTFDump = true;
467 static uint32_t nErrors = 0;
468 nErrors++;
469 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
470 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
471 }
472 }
473
474 int32_t totalCount = 0;
475 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
476 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
477 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
478 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
479 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
480 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
481 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
482 totalCount += tpcZSmeta.Pointers[i][j].size();
483 }
484 }
485 } else if (mSpecConfig.decompressTPC) {
486 if (mSpecConfig.decompressTPCFromROOT) {
487 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
488 compClustersFlatDummy.setForward(&compClustersDummy);
489 pCompClustersFlat = &compClustersFlatDummy;
490 } else {
491 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
492 }
493 if (pCompClustersFlat == nullptr) {
494 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
495 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
496 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
497 }
498 } else if (!mSpecConfig.zsOnTheFly) {
499 if (mVerbosity) {
500 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
501 }
502 }
503}
504
505int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
506{
507 int32_t retVal = 0;
508 if (mConfParam->dump < 2) {
509 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
510
511 if (retVal == 0 && mSpecConfig.runITSTracking) {
512 retVal = runITSTracking(*pc);
513 }
514 }
515
516 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
517 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
518 }
519 return retVal;
520}
521
522void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
523{
524 if (mOldCalibObjects.size() > 0) {
525 mOldCalibObjects.pop();
526 }
527 mOldCalibObjects.emplace(std::move(oldCalibObjects));
528}
529
531{
532 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
533 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
534
535 auto cput = mTimer->CpuTime();
536 auto realt = mTimer->RealTime();
537 mTimer->Start(false);
538 mNTFs++;
539
540 std::vector<gsl::span<const char>> inputs;
541
542 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
543 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
544 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
545 o2::tpc::CompressedClusters compClustersDummy;
548 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
549 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
550 std::unique_ptr<char[]> tmpEmptyCompClusters;
551
552 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
553 bool debugTFDump = false;
554
555 if (mSpecConfig.processMC) {
556 getWorkflowTPCInput_mc = true;
557 }
558 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
559 getWorkflowTPCInput_clusters = true;
560 }
561 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
562 getWorkflowTPCInput_digits = true;
563 }
564
565 // ------------------------------ Handle inputs ------------------------------
566
567 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
568
570 if (mSpecConfig.enableDoublePipeline != 2) {
571 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
572 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
573 }
574 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
575 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
576 }
577 }
578
580 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
581 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
582
583 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
584 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
585 mTFSettings->hasTfStartOrbit = 1;
586 mTFSettings->hasNHBFPerTF = 1;
587 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
588 mTFSettings->hasRunStartOrbit = 0;
589 ptrs.settingsTF = mTFSettings.get();
590
591 if (mSpecConfig.enableDoublePipeline != 2) {
592 if (mVerbosity) {
593 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
594 }
595 if (mConfParam->checkFirstTfOrbit) {
596 static uint32_t lastFirstTFOrbit = -1;
597 static uint32_t lastTFCounter = -1;
598 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
599 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
600 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
601 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
602 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
603 }
604 }
605 lastFirstTFOrbit = tinfo.firstTForbit;
606 lastTFCounter = tinfo.tfCounter;
607 }
608 }
609
611 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
612 if (mSpecConfig.readTRDtracklets) {
613 o2::globaltracking::DataRequest dataRequestTRD;
615 inputTracksTRD.collectData(pc, dataRequestTRD);
616 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
617 }
618
619 void* ptrEp[NSectors * NEndpoints] = {};
620 bool doInputDigits = false, doInputDigitsMC = false;
621 if (mSpecConfig.decompressTPC) {
622 ptrs.tpcCompressedClusters = pCompClustersFlat;
623 } else if (mSpecConfig.zsOnTheFly) {
624 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
625 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
626 ptrs.tpcZS = &tpcZS;
627 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
628 } else if (mSpecConfig.zsDecoder) {
629 ptrs.tpcZS = &tpcZS;
630 if (mSpecConfig.processMC) {
631 throw std::runtime_error("Cannot process MC information, none available");
632 }
633 } else if (mSpecConfig.caClusterer) {
634 doInputDigits = true;
635 doInputDigitsMC = mSpecConfig.processMC;
636 } else {
637 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
638 }
639
640 if (mTPCSectorMask != 0xFFFFFFFFF) {
641 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
642 for (uint32_t i = 0; i < NSectors; i++) {
643 if (!(mTPCSectorMask & (1ul << i))) {
644 if (ptrs.tpcZS) {
645 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
646 tpcZS.sector[i].zsPtr[j] = nullptr;
647 tpcZS.sector[i].nZSPtr[j] = nullptr;
648 tpcZS.sector[i].count[j] = 0;
649 }
650 }
651 }
652 }
653 }
654
655 GPUTrackingInOutDigits tpcDigitsMap;
656 GPUTPCDigitsMCInput tpcDigitsMapMC;
657 if (doInputDigits) {
658 ptrs.tpcPackedDigits = &tpcDigitsMap;
659 if (doInputDigitsMC) {
660 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
661 }
662 for (uint32_t i = 0; i < NSectors; i++) {
663 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
664 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
665 if (doInputDigitsMC) {
666 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
667 }
668 }
669 }
670
671 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
672 if (mClusterOutputIds.size() > 0) {
673 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
674 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
675 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
676 }
677
678 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
679
680 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
681 if (mSpecConfig.enableDoublePipeline) {
682 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
683 return;
684 }
685 }
686
687 // ------------------------------ Prepare outputs ------------------------------
688
689 GPUInterfaceOutputs outputRegions;
690 using outputDataType = char;
691 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
692 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
693 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
694 std::unordered_set<std::string> outputsCreated;
695
696 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
697 if (condition) {
698 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
699 if (mConfParam->allocateOutputOnTheFly) {
700 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
701 size += offset;
702 if (mVerbosity) {
703 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
704 }
705 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
706 if (mVerbosity) {
707 start = std::chrono::high_resolution_clock::now();
708 }
709 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
710 outputsCreated.insert(name);
711 if (mVerbosity) {
712 end = std::chrono::high_resolution_clock::now();
713 std::chrono::duration<double> elapsed_seconds = end - start;
714 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
715 << ": " << elapsed_seconds.count() << "s";
716 }
717 return (buffer.second = buffer.first->get().data()) + offset;
718 };
719 } else {
720 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
721 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
722 region.size = buffer.first->get().size() - offset;
723 outputsCreated.insert(name);
724 }
725 }
726 };
727
728 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
729 if (!buffer.first) {
730 return;
731 }
732 if (buffer.first->get().size() < size) {
733 throw std::runtime_error("Invalid buffer size requested");
734 }
735 buffer.first->get().resize(size);
736 if (size && buffer.first->get().data() != buffer.second) {
737 throw std::runtime_error("Inconsistent buffer address after downsize");
738 }
739 };
740
741 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
742 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
743 downSizeBuffer(buffer, size);
744 };*/
745
746 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
747 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
748 if (!buffer.first) {
749 return;
750 }
751 if (span.size() && buffer.second != (char*)span.data()) {
752 throw std::runtime_error("Buffer does not match span");
753 }
754 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
755 };
756
757 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
758 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
759 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
760 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
761 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
762 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
763 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
764 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
766 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
767 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
768 }
769
770 // ------------------------------ Actual processing ------------------------------
771
772 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
773 throw std::runtime_error("Invalid input for gpu tracking");
774 }
775
776 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
777
778 calibObjectStruct oldCalibObjects;
779 doCalibUpdates(pc, oldCalibObjects);
780
781 lockDecodeInput.reset();
782
783 uint32_t threadIndex;
784 if (mConfParam->dump) {
785 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
786 while (pipelineContext->jobThreadIndex == -1) {
787 }
788 threadIndex = pipelineContext->jobThreadIndex;
789 } else {
790 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
791 }
792
793 std::string dir = "";
794 if (mConfParam->dumpFolder != "") {
795 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
796 if (mNTFs == 1) {
797 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
798 }
799 dir += "/";
800 }
801 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
802 mGPUReco->DumpSettings(threadIndex, dir.c_str());
803 }
804 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
805 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
806 mNTFDumps++;
807 }
808 }
809 if (mNTFs == 1 && pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0) { // TPC ConfigurableCarams are somewhat special, need to construct by hand
810 o2::conf::ConfigurableParam::write(o2::base::NameConf::getConfigOutputFileName(pc.services().get<const o2::framework::DeviceSpec>().name, "rec_tpc"), "GPU_rec_tpc,GPU_rec,GPU_proc_param,GPU_proc,GPU_global,trackTuneParams");
811 }
812
813 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
814 if (mConfParam->dumpBadTFMode == 2) {
815 ptrsDump.reset(new GPUTrackingInOutPointers);
816 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
817 }
818
819 int32_t retVal = 0;
820 if (mSpecConfig.enableDoublePipeline) {
821 if (!pipelineContext->jobSubmitted) {
822 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
823 } else {
824 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
825 }
826 std::unique_lock lk(pipelineContext->jobFinishedMutex);
827 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
828 retVal = pipelineContext->jobReturnValue;
829 threadIndex = pipelineContext->jobThreadIndex;
830 } else {
831 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
832 threadIndex = mNextThreadIndex;
833 if (mConfig->configProcessing.doublePipeline) {
834 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
835 }
836
837 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
838 }
839 if (retVal != 0) {
840 debugTFDump = true;
841 }
842 cleanOldCalibsTPCPtrs(oldCalibObjects);
843
844 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
845
846 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
847 mNDebugDumps++;
848 if (mConfParam->dumpBadTFMode <= 1) {
849 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
850 FILE* fp = fopen(filename.c_str(), "w+b");
851 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
852 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
853 auto data = pc.inputs().get<gsl::span<char>>(ref);
854 if (mConfParam->dumpBadTFMode == 1) {
855 uint64_t size = data.size();
856 fwrite(&size, 1, sizeof(size), fp);
857 }
858 fwrite(data.data(), 1, data.size(), fp);
859 }
860 fclose(fp);
861 } else if (mConfParam->dumpBadTFMode == 2) {
862 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
863 }
864 }
865
866 if (mConfParam->dump == 2) {
867 return;
868 }
869
870 // ------------------------------ Varios postprocessing steps ------------------------------
871
872 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
874 }
875 bool createEmptyOutput = false;
876 if (retVal != 0) {
877 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
878 if (mConfig->configProcessing.throttleAlarms) {
879 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
880 } else {
881 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
882 }
883 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
884 } else {
885 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
886 }
887 }
888
889 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
890 if (createEmptyOutput) {
891 memset(&ptrs, 0, sizeof(ptrs));
892 for (uint32_t i = 0; i < outputRegions.count(); i++) {
893 if (outputBuffers[i].first) {
894 size_t toSize = 0;
895 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
896 toSize = sizeof(*ptrs.tpcCompressedClusters);
897 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
898 toSize = sizeof(o2::tpc::ClusterCountIndex);
899 }
900 outputBuffers[i].first->get().resize(toSize);
901 outputBuffers[i].second = outputBuffers[i].first->get().data();
902 if (toSize) {
903 memset(outputBuffers[i].second, 0, toSize);
904 }
905 }
906 }
907 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
908 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
909 ptrs.clustersNative = tmpEmptyClNative.get();
910 if (mSpecConfig.processMC) {
911 MCLabelContainer cont;
912 cont.flatten_to(clustersMCBuffer.first);
913 clustersMCBuffer.second = clustersMCBuffer.first;
914 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
915 }
916 } else {
917 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
918 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
919 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
920 if (!mConfParam->allocateOutputOnTheFly) {
921 for (uint32_t i = 0; i < outputRegions.count(); i++) {
922 if (outputRegions.asArray()[i].ptrBase) {
923 if (outputRegions.asArray()[i].size == 1) {
924 throw std::runtime_error("Preallocated buffer size exceeded");
925 }
926 outputRegions.asArray()[i].checkCurrent();
927 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
928 }
929 }
930 }
931 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
932 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
933 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
934
935 // if requested, tune TPC tracks
936 if (ptrs.nOutputTracksTPCO2) {
937 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
938 }
939
940 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
941 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
942 }
943 }
944
945 if (mConfig->configWorkflow.outputs.isSet(gpudatatypes::InOutType::TPCMergedTracks)) {
946 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
947 }
948
949 if (mSpecConfig.outputCompClustersRoot) {
952 }
953
954 if (mClusterOutputIds.size() > 0) {
955 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
956 if (mSpecConfig.sendClustersPerSector) {
957 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
958 for (uint32_t i = 0; i < NSectors; i++) {
959 if (mTPCSectorMask & (1ul << i)) {
961 clusterOutputSectorHeader.sectorBits = (1ul << i);
962 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
963 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
964 memset(outIndex, 0, sizeof(*outIndex));
965 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
966 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
967 }
968 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
969 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
970 MCLabelContainer cont;
971 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
972 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
973 for (const auto& label : labels) {
974 cont.addElement(j, label);
975 }
976 }
977 ConstMCLabelContainer contflat;
978 cont.flatten_to(contflat);
979 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
980 }
981 }
982 }
983 } else {
984 // Clusters are shipped as single message, fill ClusterCountIndex
985 DataHeader::SubSpecificationType subspec = NSectors;
986 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
987 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
988 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
989 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
990 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
991 }
992 }
993 }
994 if (mSpecConfig.outputQA) {
995 TObjArray out;
996 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
997 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
998 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
999 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
1000 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
1001 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1002 if (sendQAOutput) {
1003 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1004 }
1005 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1006 if (sendQAOutput) {
1007 mQA->cleanup();
1008 }
1009 }
1010 if (mSpecConfig.outputErrorQA) {
1011 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1012 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1013 }
1014 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1016 }
1017 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1019 }
1020 mTimer->Stop();
1021 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1022}
1023
1024void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1025{
1026 GPUCalibObjectsConst newCalibObjects;
1027 GPUNewCalibValues newCalibValues;
1028 // check for updates of TPC calibration objects
1029 bool needCalibUpdate = false;
1030 if (mGRPGeomUpdated) {
1031 mGRPGeomUpdated = false;
1032 needCalibUpdate = true;
1033
1034 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1037 mITSGeometryCreated = true;
1038 }
1039
1040 if (mAutoSolenoidBz) {
1041 newCalibValues.newSolenoidField = true;
1042 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1043 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1044 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1045 }
1046 if (mAutoContinuousMaxTimeBin) {
1047 newCalibValues.newContinuousMaxTimeBin = true;
1048 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1049 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1050 }
1051
1052 if (!mPropagatorInstanceCreated) {
1053 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1054 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1055 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1056 }
1057 mPropagatorInstanceCreated = true;
1058 }
1059
1060 if (!mMatLUTCreated) {
1061 if (mConfParam->matLUTFile.size() == 0) {
1062 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1063 LOG(info) << "Loaded material budget lookup table";
1064 }
1065 mMatLUTCreated = true;
1066 }
1067 if (mSpecConfig.readTRDtracklets) {
1068 if (!mTRDGeometryCreated) {
1069 auto gm = o2::trd::Geometry::instance();
1070 gm->createPadPlaneArray();
1071 gm->createClusterMatrixArray();
1072 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1073 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1074 LOG(info) << "Loaded TRD geometry";
1075 mTRDGeometryCreated = true;
1076 }
1077 if (!mTRDRecoParamCreated) {
1078 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1079 newCalibObjects.trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1080 mTRDRecoParamCreated = true;
1081 }
1082 }
1083 }
1084 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1085 if (mSpecConfig.runITSTracking) {
1086 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1087 }
1088 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1089 newCalibValues.newTPCTimeBinCut = true;
1090 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1091 needCalibUpdate = true;
1092 }
1093 if (mSpecConfig.nnLoadFromCCDB) {
1094 auto dumpToFile = [](const char* buffer, std::size_t validSize, const std::string& path) {
1095 std::ofstream out(path, std::ios::binary | std::ios::trunc);
1096 if (!out.is_open()) {
1097 throw std::runtime_error("Failed to open output file: " + path);
1098 }
1099
1100 out.write(buffer, static_cast<std::streamsize>(validSize));
1101 if (!out) {
1102 throw std::runtime_error("Failed while writing data to: " + path);
1103 }
1104 };
1105 for (int i = 0; i < 3; i++) {
1106 newCalibObjects.nnClusterizerNetworks[i] = mConfig->configCalib.nnClusterizerNetworks[i];
1107 if (mSpecConfig.nnDumpToFile && newCalibObjects.nnClusterizerNetworks[i]) {
1108 std::string path = "tpc_nn_clusterizer_" + std::to_string(i) + ".onnx";
1109 dumpToFile(newCalibObjects.nnClusterizerNetworks[i]->getONNXModel(), newCalibObjects.nnClusterizerNetworks[i]->getONNXModelSize(), path);
1110 LOG(info) << "Dumped TPC clusterizer NN " << i << " to file " << path;
1111 }
1112 }
1113 }
1114 if (needCalibUpdate) {
1115 LOG(info) << "Updating GPUReconstruction calibration objects";
1116 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1117 }
1118}
1119
1121{
1122 Options opts;
1123 if (mSpecConfig.enableDoublePipeline) {
1124 bool send = mSpecConfig.enableDoublePipeline == 2;
1125 char* o2jobid = getenv("O2JOBID");
1126 char* numaid = getenv("NUMAID");
1127 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1128 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1129 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1130 }
1131 if (mSpecConfig.enableDoublePipeline == 2) {
1132 return opts;
1133 }
1134 return opts;
1135}
1136
1138{
1139 Inputs inputs;
1140 if (mSpecConfig.zsDecoder) {
1141 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1142 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1143 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1144 if (mSpecConfig.askDISTSTF) {
1145 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1146 }
1147 }
1148 if (mSpecConfig.enableDoublePipeline == 2) {
1149 if (!mSpecConfig.zsDecoder) {
1150 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1151 }
1152 return inputs;
1153 } else if (mSpecConfig.enableDoublePipeline == 1) {
1154 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1155 }
1156 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1157 // calibration objects for TPC clusterization
1158 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1159 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1160 }
1161 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1162 // calibration objects for TPC tracking
1163 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1164 if (mapSources != 0) {
1165 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1167 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1168 }
1170 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1171 }
1172 }
1173
1174 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1175 if (mSpecConfig.tpcUseMCTimeGain) {
1176 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1177 } else {
1178 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1179 }
1180 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1181 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1183 inputs.emplace_back("corrMap", o2::header::gDataOriginTPC, "TPCCORRMAP", 0, Lifetime::Timeframe);
1184 if (mSpecConfig.enableCTPLumi) {
1185 inputs.emplace_back("lumiCTP", o2::header::gDataOriginCTP, "LUMICTP", 0, Lifetime::Timeframe);
1186 }
1187 }
1188 if (mSpecConfig.decompressTPC) {
1189 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1190 } else if (mSpecConfig.caClusterer) {
1191 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1192 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1193 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1194 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1195 }
1196 } else if (mSpecConfig.runTPCTracking) {
1197 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1198 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1199 }
1200 if (mSpecConfig.processMC) {
1201 if (mSpecConfig.caClusterer) {
1202 if (!mSpecConfig.zsDecoder) {
1203 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1204 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1205 }
1206 } else if (mSpecConfig.runTPCTracking) {
1207 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1208 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1209 }
1210 }
1211
1212 if (mSpecConfig.zsOnTheFly) {
1213 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1214 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1215 }
1216 if (mSpecConfig.readTRDtracklets) {
1217 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1218 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1219 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1220 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1221 }
1222
1223 if (mSpecConfig.runITSTracking) {
1224 for (unsigned int iLay{0}; iLay < (mSpecConfig.itsStaggered ? 7 : 1); ++iLay) {
1225 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", iLay, Lifetime::Timeframe);
1226 inputs.emplace_back("patterns", "ITS", "PATTERNS", iLay, Lifetime::Timeframe);
1227 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", iLay, Lifetime::Timeframe);
1228 if (mSpecConfig.processMC) {
1229 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", iLay, Lifetime::Timeframe);
1230 }
1231 }
1232 if (mSpecConfig.itsTriggerType == 1) {
1233 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1234 } else if (mSpecConfig.itsTriggerType == 2) {
1235 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1236 }
1237 if (mSpecConfig.enableDoublePipeline != 2) {
1238 if (mSpecConfig.isITS3) {
1239 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1240 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1241 } else {
1242 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1243 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1244 }
1245 if (mSpecConfig.itsOverrBeamEst) {
1246 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1247 }
1248 }
1249 }
1250
1251 // NN clusterizer
1252 *mConfParam = mConfig->ReadConfigurableParam();
1253 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1254
1255 LOG(info) << "(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1256 mSpecConfig.nnLoadFromCCDB = true;
1257 mSpecConfig.nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1258 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1259
1260 std::map<std::string, std::string> metadata;
1261 metadata["inputDType"] = nnClusterizerSettings.nnInferenceInputDType; // FP16 or FP32
1262 metadata["outputDType"] = nnClusterizerSettings.nnInferenceOutputDType; // FP16 or FP32
1263 metadata["nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum; // 0, 1 -> Only for regression model
1264 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType; // FC, CNN
1265 metadata["nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate; // in kHz
1266 metadata["nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType; // pp, pPb, PbPb
1267
1268 auto convert_map_to_metadata = [](const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1269 for (const auto& [key, value] : inputMap) {
1270 if (value != "") {
1271 outputMetadata.push_back({key, value});
1272 }
1273 }
1274 };
1275
1276 mSpecConfig.nnEvalMode = o2::utils::Str::tokenize(nnClusterizerSettings.nnEvalMode, ':');
1277 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1278
1279 if (mConfParam->printSettings) {
1280 auto printSettings = [](const std::map<std::string, std::string>& settings) {
1281 LOG(info) << "(NN CLUS) NN Clusterizer CCDB settings:";
1282 for (const auto& [key, value] : settings) {
1283 LOG(info) << " " << key << " : " << value;
1284 }
1285 };
1286 printSettings(metadata);
1287 }
1288
1289 if (mSpecConfig.nnEvalMode[0] == "c1") {
1290 metadata["nnCCDBEvalType"] = "classification_c1";
1291 convert_map_to_metadata(metadata, ccdb_metadata);
1292 inputs.emplace_back("nn_classification_c1", gDataOriginTPC, "NNCLUSTERIZER_C1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1293 } else if (mSpecConfig.nnEvalMode[0] == "c2") {
1294 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1295 metadata["nnCCDBEvalType"] = "classification_c2";
1296 convert_map_to_metadata(metadata, ccdb_metadata);
1297 inputs.emplace_back("nn_classification_c2", gDataOriginTPC, "NNCLUSTERIZER_C2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1298 }
1299
1300 metadata["nnCCDBEvalType"] = "regression_c1";
1301 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 convert_map_to_metadata(metadata, ccdb_metadata);
1303 inputs.emplace_back("nn_regression_c1", gDataOriginTPC, "NNCLUSTERIZER_R1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1304
1305 if (mSpecConfig.nnEvalMode[1] == "r2") {
1306 metadata["nnCCDBEvalType"] = "regression_c2";
1307 convert_map_to_metadata(metadata, ccdb_metadata);
1308 inputs.emplace_back("nn_regression_c2", gDataOriginTPC, "NNCLUSTERIZER_R2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1309 }
1310 }
1311
1312 return inputs;
1313};
1314
1316{
1317 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1318 std::vector<OutputSpec> outputSpecs;
1319 if (mSpecConfig.enableDoublePipeline == 2) {
1320 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1321 return outputSpecs;
1322 }
1323 if (mSpecConfig.outputTracks) {
1324 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1325 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1326 }
1327 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1328 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1329 }
1330 if (mSpecConfig.outputCompClustersRoot) {
1331 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1332 }
1333 if (mSpecConfig.outputCompClustersFlat) {
1334 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1335 }
1336 if (mSpecConfig.outputCAClusters) {
1337 for (auto const& sector : mTPCSectors) {
1338 mClusterOutputIds.emplace_back(sector);
1339 }
1340 if (mSpecConfig.sendClustersPerSector) {
1341 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1342 for (const auto sector : mTPCSectors) {
1343 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1344 }
1345 } else {
1346 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1347 }
1348 if (mSpecConfig.processMC) {
1349 if (mSpecConfig.sendClustersPerSector) {
1350 for (const auto sector : mTPCSectors) {
1351 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1352 }
1353 } else {
1354 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1355 }
1356 }
1357 }
1358 if (mSpecConfig.outputSharedClusterMap) {
1359 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1360 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1361 }
1362 if (mSpecConfig.tpcTriggerHandling) {
1363 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1364 }
1365 if (mSpecConfig.outputQA) {
1366 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1367 }
1368 if (mSpecConfig.outputErrorQA) {
1369 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1370 }
1371
1372 if (mSpecConfig.runITSTracking) {
1373 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1377 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1378 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1379
1380 if (mSpecConfig.processMC) {
1381 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1384 }
1385 }
1386
1387 return outputSpecs;
1388};
1389
1391{
1392 ExitPipeline();
1393 mQA.reset(nullptr);
1394 mDisplayFrontend.reset(nullptr);
1395 mGPUReco.reset(nullptr);
1396}
1397
1398} // namespace o2::gpu
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
StringRef key
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
static std::string getConfigOutputFileName(const std::string &procName, const std::string &confName="", bool json=true)
Definition NameConf.cxx:115
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:178
static void write(std::string const &filename, std::string const &keyOnly="")
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
A read-only version of MCTruthContainer allowing for storage optimisation.
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginCTP
Definition DataHeader.h:564
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:593
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:622
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"