Project
Loading...
Searching...
No Matches
GPUWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
19#include "Framework/WorkflowSpec.h" // o2::framework::mergeInputs
27#include "Framework/Logger.h"
42#include "TPCFastTransform.h"
51#include "TPCBase/RDHUtils.h"
53#include "GPUO2InterfaceQA.h"
54#include "GPUO2Interface.h"
55#include "GPUO2InterfaceUtils.h"
56#include "CalibdEdxContainer.h"
57#include "ORTRootSerializer.h"
58#include "GPUNewCalibValues.h"
59#include "TPCPadGainCalib.h"
60#include "TPCZSLinkMapping.h"
62#include "TPCBase/Sector.h"
63#include "TPCBase/Utils.h"
71#include "Algorithm/Parser.h"
74#include "TRDBase/Geometry.h"
76#include "GPUTRDRecoParam.h"
82#include "GPUWorkflowInternal.h"
83#include "GPUDataTypesQA.h"
84// #include "Framework/ThreadPool.h"
85
86#include <TStopwatch.h>
87#include <TObjArray.h>
88#include <TH1F.h>
89#include <TH2F.h>
90#include <TH1D.h>
91#include <TGraphAsymmErrors.h>
92
93#include <filesystem>
94#include <memory>
95#include <vector>
96#include <iomanip>
97#include <stdexcept>
98#include <regex>
99#include <sys/types.h>
100#include <sys/stat.h>
101#include <fcntl.h>
102#include <chrono>
103#include <unordered_set>
104
105using namespace o2::framework;
106using namespace o2::header;
107using namespace o2::gpu;
108using namespace o2::base;
109using namespace o2::dataformats;
111
112namespace o2::gpu
113{
114
115GPURecoWorkflowSpec::GPURecoWorkflowSpec(GPURecoWorkflowSpec::CompletionPolicyData* policyData, Config const& specconfig, std::vector<int32_t> const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<bool(o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) : o2::framework::Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
116{
117 if (mSpecConfig.outputCAClusters && !mSpecConfig.caClusterer && !mSpecConfig.decompressTPC && !mSpecConfig.useFilteredOutputSpecs) {
118 throw std::runtime_error("inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
119 }
120
121 mConfig.reset(new GPUO2InterfaceConfiguration);
122 mConfParam.reset(new GPUSettingsO2);
123 mTFSettings.reset(new GPUSettingsTF);
124 mTimer.reset(new TStopwatch);
125 mPipeline.reset(new GPURecoWorkflowSpec_PipelineInternals);
126
127 if (mSpecConfig.enableDoublePipeline == 1 && gPolicyOrder) {
128 *gPolicyOrder = &mPolicyOrder;
129 }
130}
131
133
135{
137 GPUO2InterfaceConfiguration& config = *mConfig.get();
138
139 // Create configuration object and fill settings
140 mConfig->configGRP.solenoidBzNominalGPU = 0;
141 mTFSettings->hasSimStartOrbit = 1;
142 auto& hbfu = o2::raw::HBFUtils::Instance();
143 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
144
145 *mConfParam = mConfig->ReadConfigurableParam();
146
147 if (mConfParam->display) {
148 mDisplayFrontend.reset(GPUDisplayFrontendInterface::getFrontend(mConfig->configDisplay.displayFrontend.c_str()));
149 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
150 if (mConfig->configProcessing.eventDisplay != nullptr) {
151 LOG(info) << "Event display enabled";
152 } else {
153 throw std::runtime_error("GPU Event Display frontend could not be created!");
154 }
155 }
156 if (mSpecConfig.enableDoublePipeline) {
157 mConfig->configProcessing.doublePipeline = 1;
158 }
159
160 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
161 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
162 if (mAutoContinuousMaxTimeBin) {
163 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : 256);
164 }
165 if (mConfig->configProcessing.deviceNum == -2) {
166 int32_t myId = ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId;
167 int32_t idMax = ic.services().get<const o2::framework::DeviceSpec>().maxInputTimeslices;
168 mConfig->configProcessing.deviceNum = myId;
169 LOG(info) << "GPU device number selected from pipeline id: " << myId << " / " << idMax;
170 }
171 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
172 mVerbosity = 1;
173 }
174 mConfig->configProcessing.runMC = mSpecConfig.processMC;
175 if (mSpecConfig.outputQA) {
176 if (!mSpecConfig.processMC && !mConfig->configQA.clusterRejectionHistograms) {
177 throw std::runtime_error("Need MC information to create QA plots");
178 }
179 if (!mSpecConfig.processMC) {
180 mConfig->configQA.noMC = true;
181 }
182 mConfig->configQA.shipToQC = true;
183 if (!mConfig->configProcessing.runQA) {
184 mConfig->configQA.enableLocalOutput = false;
185 mQATaskMask = (mSpecConfig.processMC ? gpudatatypes::gpuqa::tasksAllMC : gpudatatypes::gpuqa::tasksNone) | (mConfig->configQA.clusterRejectionHistograms ? gpudatatypes::gpuqa::taskClusterCounts : gpudatatypes::gpuqa::tasksNone);
186 mConfig->configProcessing.runQA = -mQATaskMask;
187 }
188 }
189 mConfig->configInterface.outputToExternalBuffers = true;
190 const bool runTracking = mSpecConfig.outputTracks || mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat;
191
192 // Configure the "GPU workflow" i.e. which steps we run on the GPU (or CPU)
193 if (runTracking) {
194 mConfig->configWorkflow.steps.set(gpudatatypes::RecoStep::TPCConversion,
197 mConfig->configWorkflow.outputs.set(gpudatatypes::InOutType::TPCMergedTracks);
198 }
199 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
200
201 if (mSpecConfig.outputCompClustersRoot || mSpecConfig.outputCompClustersFlat) {
202 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, true);
203 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, true);
204 }
205 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCClusters);
206 if (mSpecConfig.caClusterer) { // Override some settings if we have raw data as input
207 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCRaw);
208 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCClusterFinding, true);
209 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
210 }
211 if (mSpecConfig.decompressTPC) {
212 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCCompression, false);
213 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TPCDecompression, true);
214 mConfig->configWorkflow.inputs.set(gpudatatypes::InOutType::TPCCompressedClusters);
215 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCClusters, true);
216 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::TPCCompressedClusters, false);
217 if (mTPCSectorMask != 0xFFFFFFFFF) {
218 throw std::invalid_argument("Cannot run TPC decompression with a sector mask");
219 }
220 }
221 if (mSpecConfig.runTRDTracking) {
222 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::TRDTracklets, true);
223 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::TRDTracking, true);
224 }
225 if (mSpecConfig.runITSTracking) {
226 mConfig->configWorkflow.inputs.setBits(gpudatatypes::InOutType::ITSClusters, true);
227 mConfig->configWorkflow.outputs.setBits(gpudatatypes::InOutType::ITSTracks, true);
228 mConfig->configWorkflow.steps.setBits(gpudatatypes::RecoStep::ITSTracking, true);
229 }
230 if (mSpecConfig.outputSharedClusterMap) {
231 mConfig->configProcessing.outputSharedClusterMap = true;
232 }
233 if (!mSpecConfig.outputTracks) {
234 mConfig->configProcessing.createO2Output = 0; // Disable O2 TPC track format output if no track output requested
235 }
236 mConfig->configProcessing.param.tpcTriggerHandling = mSpecConfig.tpcTriggerHandling;
237
238 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
239 LOG(fatal) << "Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
240 << "Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
241 << "--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
242 }
243 /* if (config.configProcessing.doublePipeline && ic.services().get<ThreadPool>().poolSize != 2) {
244 throw std::runtime_error("double pipeline requires exactly 2 threads");
245 } */
246 if (config.configProcessing.doublePipeline && (mSpecConfig.readTRDtracklets || mSpecConfig.runITSTracking || !(mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder))) {
247 LOG(fatal) << "GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
248 }
249
250 if (mSpecConfig.enableDoublePipeline != 2) {
251 mGPUReco = std::make_unique<GPUO2Interface>();
252
253 // initialize TPC calib objects
254 initFunctionTPCCalib(ic);
255
256 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
257 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
258 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
259 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
260 if (mConfig->configCalib.fastTransform == nullptr) {
261 throw std::invalid_argument("GPU workflow: initialization of the TPC transformation failed");
262 }
263
264 if (mConfParam->matLUTFile.size()) {
265 LOGP(info, "Loading matlut file {}", mConfParam->matLUTFile.c_str());
266 mConfig->configCalib.matLUT = o2::base::MatLayerCylSet::loadFromFile(mConfParam->matLUTFile.c_str());
267 if (mConfig->configCalib.matLUT == nullptr) {
268 LOGF(fatal, "Error loading matlut file");
269 }
270 } else {
271 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
272 }
273
274 if (mSpecConfig.readTRDtracklets) {
275 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
276 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
277
278 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
279 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
280 }
281
282 mConfig->configProcessing.willProvideO2PropagatorLate = true;
283 mConfig->configProcessing.o2PropagatorUseGPUField = true;
284
285 if (mConfParam->printSettings && (mConfParam->printSettings > 1 || ic.services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
286 mConfig->configProcessing.printSettings = true;
287 if (mConfParam->printSettings > 1) {
288 mConfig->PrintParam();
289 }
290 }
291
292 // Configuration is prepared, initialize the tracker.
293 if (mGPUReco->Initialize(config) != 0) {
294 throw std::invalid_argument("GPU Reconstruction initialization failed");
295 }
296 if (mSpecConfig.outputQA) {
297 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
298 }
299 if (mSpecConfig.outputErrorQA) {
300 mGPUReco->setErrorCodeOutput(&mErrorQA);
301 }
302
303 // initialize ITS
304 if (mSpecConfig.runITSTracking) {
305 initFunctionITS(ic);
306 }
307 }
308
309 if (mSpecConfig.enableDoublePipeline) {
310 initPipeline(ic);
311 if (mConfParam->dump >= 2) {
312 LOG(fatal) << "Cannot use dump-only mode with multi-threaded pipeline";
313 }
314 }
315
316 auto& callbacks = ic.services().get<CallbackService>();
317 callbacks.set<CallbackService::Id::RegionInfoCallback>([this](fair::mq::RegionInfo const& info) {
318 if (info.size == 0) {
319 return;
320 }
321 if (mSpecConfig.enableDoublePipeline) {
322 mRegionInfos.emplace_back(info);
323 }
324 if (mSpecConfig.enableDoublePipeline == 2) {
325 return;
326 }
327 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
328 return;
329 }
330 int32_t fd = 0;
331 if (mConfParam->mutexMemReg) {
332 mode_t mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
333 fd = open("/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC, mask);
334 if (fd == -1) {
335 throw std::runtime_error("Error opening memlock mutex lock file");
336 }
337 fchmod(fd, mask);
338 if (lockf(fd, F_LOCK, 0)) {
339 throw std::runtime_error("Error locking memlock mutex file");
340 }
341 }
342 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
343 if (mConfParam->benchmarkMemoryRegistration) {
344 start = std::chrono::high_resolution_clock::now();
345 }
346 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
347 throw std::runtime_error("Error registering memory for GPU");
348 }
349 if (mConfParam->benchmarkMemoryRegistration) {
350 end = std::chrono::high_resolution_clock::now();
351 std::chrono::duration<double> elapsed_seconds = end - start;
352 LOG(info) << "Memory registration time (0x" << info.ptr << ", " << info.size << " bytes): " << elapsed_seconds.count() << " s";
353 }
354 if (mConfParam->mutexMemReg) {
355 if (lockf(fd, F_ULOCK, 0)) {
356 throw std::runtime_error("Error unlocking memlock mutex file");
357 }
358 close(fd);
359 }
360 });
361
362 mTimer->Stop();
363 mTimer->Reset();
364}
365
367{
368 LOGF(info, "GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
369 handlePipelineStop();
370}
371
373{
374 handlePipelineEndOfStream(ec);
375}
376
378{
379 if (mSpecConfig.enableDoublePipeline != 2) {
380 finaliseCCDBTPC(matcher, obj);
381 if (mSpecConfig.runITSTracking) {
382 finaliseCCDBITS(matcher, obj);
383 }
384 }
385 if (GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
386 mGRPGeomUpdated = true;
387 return;
388 }
389}
390
391template <class D, class E, class F, class G, class H, class I, class J, class K>
392void GPURecoWorkflowSpec::processInputs(ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes, bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
393{
394 if (mSpecConfig.enableDoublePipeline == 1) {
395 return;
396 }
397 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
398 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
399
400 if (mSpecConfig.zsOnTheFly || mSpecConfig.zsDecoder) {
401 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
402 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
403 tpcZSmeta.Pointers[i][j].clear();
404 tpcZSmeta.Sizes[i][j].clear();
405 }
406 }
407 }
408 if (mSpecConfig.zsOnTheFly) {
409 tpcZSonTheFlySizes = {0};
410 // tpcZSonTheFlySizes: #zs pages per endpoint:
411 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "ZSSIZES"}, Lifetime::Timeframe}};
412 bool recv = false, recvsizes = false;
413 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
414 if (recvsizes) {
415 throw std::runtime_error("Received multiple ZSSIZES data");
416 }
417 tpcZSonTheFlySizes = pc.inputs().get<std::array<uint32_t, NEndpoints * NSectors>>(ref);
418 recvsizes = true;
419 }
420 // zs pages
421 std::vector<InputSpec> filter2 = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "TPCZS"}, Lifetime::Timeframe}};
422 for (auto const& ref : InputRecordWalker(pc.inputs(), filter2)) {
423 if (recv) {
424 throw std::runtime_error("Received multiple TPCZS data");
425 }
426 inputZS = pc.inputs().get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(ref);
427 recv = true;
428 }
429 if (!recv || !recvsizes) {
430 throw std::runtime_error("TPC ZS on the fly data not received");
431 }
432
433 uint32_t offset = 0;
434 for (uint32_t i = 0; i < NSectors; i++) {
435 uint32_t pageSector = 0;
436 for (uint32_t j = 0; j < NEndpoints; j++) {
437 pageSector += tpcZSonTheFlySizes[i * NEndpoints + j];
438 offset += tpcZSonTheFlySizes[i * NEndpoints + j];
439 }
440 if (mVerbosity >= 1) {
441 LOG(info) << "GOT ZS on the fly pages FOR SECTOR " << i << " -> pages: " << pageSector;
442 }
443 }
444 }
445 if (mSpecConfig.zsDecoder) {
446 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
447 auto isSameRdh = [](const char* left, const char* right) -> bool {
448 return o2::raw::RDHUtils::getFEEID(left) == o2::raw::RDHUtils::getFEEID(right) && o2::raw::RDHUtils::getDetectorField(left) == o2::raw::RDHUtils::getDetectorField(right);
449 };
450 auto checkForZSData = [](const char* ptr, uint32_t subSpec) -> bool {
451 const auto rdhLink = o2::raw::RDHUtils::getLinkID(ptr);
452 const auto detField = o2::raw::RDHUtils::getDetectorField(ptr);
453 const auto feeID = o2::raw::RDHUtils::getFEEID(ptr);
454 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
455 // This check is not what it is supposed to be, but some MC SYNTHETIC data was generated with rdhLinkId set to feeLinkId, so we add some extra logic so we can still decode it
456 return detField == o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
457 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
458 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
459 };
460 auto insertPages = [&tpcZSmeta, checkForZSData](const char* ptr, size_t count, uint32_t subSpec) -> void {
461 if (checkForZSData(ptr, subSpec)) {
462 int32_t rawcru = o2::tpc::rdh_utils::getCRU(ptr);
463 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(ptr);
464 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(ptr);
465 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(count);
466 }
467 };
468 if (DPLRawPageSequencer(pc.inputs(), filter)(isSameRdh, insertPages, checkForZSData)) {
469 debugTFDump = true;
470 static uint32_t nErrors = 0;
471 nErrors++;
472 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
473 LOG(error) << "DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors << " of " << mNTFs << " TFs affected)";
474 }
475 }
476
477 int32_t totalCount = 0;
478 for (uint32_t i = 0; i < GPUTrackingInOutZS::NSECTORS; i++) {
479 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
480 tpcZSmeta.Pointers2[i][j] = tpcZSmeta.Pointers[i][j].data();
481 tpcZSmeta.Sizes2[i][j] = tpcZSmeta.Sizes[i][j].data();
482 tpcZS.sector[i].zsPtr[j] = tpcZSmeta.Pointers2[i][j];
483 tpcZS.sector[i].nZSPtr[j] = tpcZSmeta.Sizes2[i][j];
484 tpcZS.sector[i].count[j] = tpcZSmeta.Pointers[i][j].size();
485 totalCount += tpcZSmeta.Pointers[i][j].size();
486 }
487 }
488 } else if (mSpecConfig.decompressTPC) {
489 if (mSpecConfig.decompressTPCFromROOT) {
490 compClustersDummy = *pc.inputs().get<o2::tpc::CompressedClustersROOT*>("input");
491 compClustersFlatDummy.setForward(&compClustersDummy);
492 pCompClustersFlat = &compClustersFlatDummy;
493 } else {
494 pCompClustersFlat = pc.inputs().get<o2::tpc::CompressedClustersFlat*>("input").get();
495 }
496 if (pCompClustersFlat == nullptr) {
497 tmpEmptyCompClusters.reset(new char[sizeof(o2::tpc::CompressedClustersFlat)]);
498 memset(tmpEmptyCompClusters.get(), 0, sizeof(o2::tpc::CompressedClustersFlat));
499 pCompClustersFlat = (o2::tpc::CompressedClustersFlat*)tmpEmptyCompClusters.get();
500 }
501 } else if (!mSpecConfig.zsOnTheFly) {
502 if (mVerbosity) {
503 LOGF(info, "running tracking for sector(s) 0x%09x", mTPCSectorMask);
504 }
505 }
506}
507
508int32_t GPURecoWorkflowSpec::runMain(o2::framework::ProcessingContext* pc, GPUTrackingInOutPointers* ptrs, GPUInterfaceOutputs* outputRegions, int32_t threadIndex, GPUInterfaceInputUpdate* inputUpdateCallback)
509{
510 int32_t retVal = 0;
511 if (mConfParam->dump < 2) {
512 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
513
514 if (retVal == 0 && mSpecConfig.runITSTracking) {
515 retVal = runITSTracking(*pc);
516 }
517 static bool first = true;
518 if (first) {
519 first = false;
520 if (pc->services().get<const o2::framework::DeviceSpec>().inputTimesliceId == 0) { // TPC ConfigurableCarams are somewhat special, need to construct by hand
521 o2::conf::ConfigurableParam::write(o2::base::NameConf::getConfigOutputFileName(pc->services().get<const o2::framework::DeviceSpec>().name, "rec_tpc"), "GPU_rec_tpc,GPU_rec,GPU_proc_param,GPU_proc,GPU_global,trackTuneParams");
522 }
523 }
524 }
525
526 if (!mSpecConfig.enableDoublePipeline) { // TODO: Why is this needed for double-pipeline?
527 mGPUReco->Clear(false, threadIndex); // clean non-output memory used by GPU Reconstruction
528 }
529 return retVal;
530}
531
532void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
533{
534 if (mOldCalibObjects.size() > 0) {
535 mOldCalibObjects.pop();
536 }
537 mOldCalibObjects.emplace(std::move(oldCalibObjects));
538}
539
541{
542 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
543 constexpr static size_t NEndpoints = o2::gpu::GPUTrackingInOutZS::NENDPOINTS;
544
545 auto cput = mTimer->CpuTime();
546 auto realt = mTimer->RealTime();
547 mTimer->Start(false);
548 mNTFs++;
549
550 std::vector<gsl::span<const char>> inputs;
551
552 const o2::tpc::CompressedClustersFlat* pCompClustersFlat = nullptr;
553 size_t compClustersFlatDummyMemory[(sizeof(o2::tpc::CompressedClustersFlat) + sizeof(size_t) - 1) / sizeof(size_t)];
554 o2::tpc::CompressedClustersFlat& compClustersFlatDummy = reinterpret_cast<o2::tpc::CompressedClustersFlat&>(compClustersFlatDummyMemory);
555 o2::tpc::CompressedClusters compClustersDummy;
558 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
559 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
560 std::unique_ptr<char[]> tmpEmptyCompClusters;
561
562 bool getWorkflowTPCInput_clusters = false, getWorkflowTPCInput_mc = false, getWorkflowTPCInput_digits = false;
563 bool debugTFDump = false;
564
565 if (mSpecConfig.processMC) {
566 getWorkflowTPCInput_mc = true;
567 }
568 if (!mSpecConfig.decompressTPC && !mSpecConfig.caClusterer) {
569 getWorkflowTPCInput_clusters = true;
570 }
571 if (!mSpecConfig.decompressTPC && mSpecConfig.caClusterer && ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder)) {
572 getWorkflowTPCInput_digits = true;
573 }
574
575 // ------------------------------ Handle inputs ------------------------------
576
577 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
578
580 if (mSpecConfig.enableDoublePipeline != 2) {
581 if (mSpecConfig.runITSTracking && pc.inputs().getPos("itsTGeo") >= 0) {
582 pc.inputs().get<o2::its::GeometryTGeo*>("itsTGeo");
583 }
584 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
585 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
586 }
587 }
588
590 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters); // Process non-digit / non-cluster inputs
591 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits); // Process digit and cluster inputs
592
593 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
594 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
595 mTFSettings->hasTfStartOrbit = 1;
596 mTFSettings->hasNHBFPerTF = 1;
597 mTFSettings->nHBFPerTF = mConfParam->overrideNHbfPerTF ? mConfParam->overrideNHbfPerTF : GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF();
598 mTFSettings->hasRunStartOrbit = 0;
599 ptrs.settingsTF = mTFSettings.get();
600
601 if (mSpecConfig.enableDoublePipeline != 2) {
602 if (mVerbosity) {
603 LOG(info) << "TF firstTForbit " << mTFSettings->tfStartOrbit << " nHBF " << mTFSettings->nHBFPerTF << " runStartOrbit " << mTFSettings->runStartOrbit << " simStartOrbit " << mTFSettings->simStartOrbit;
604 }
605 if (mConfParam->checkFirstTfOrbit) {
606 static uint32_t lastFirstTFOrbit = -1;
607 static uint32_t lastTFCounter = -1;
608 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
609 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
610 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
611 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
612 LOG(error) << "Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit << " " << lastTFCounter << " - Current: " << tinfo.firstTForbit << " " << tinfo.tfCounter;
613 }
614 }
615 lastFirstTFOrbit = tinfo.firstTForbit;
616 lastTFCounter = tinfo.tfCounter;
617 }
618 }
619
621 decltype(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD)) trdInputContainer;
622 if (mSpecConfig.readTRDtracklets) {
623 o2::globaltracking::DataRequest dataRequestTRD;
625 inputTracksTRD.collectData(pc, dataRequestTRD);
626 trdInputContainer = std::move(o2::trd::getRecoInputContainer(pc, &ptrs, &inputTracksTRD));
627 }
628
629 void* ptrEp[NSectors * NEndpoints] = {};
630 bool doInputDigits = false, doInputDigitsMC = false;
631 if (mSpecConfig.decompressTPC) {
632 ptrs.tpcCompressedClusters = pCompClustersFlat;
633 } else if (mSpecConfig.zsOnTheFly) {
634 const uint64_t* buffer = reinterpret_cast<const uint64_t*>(&inputZS[0]);
635 o2::gpu::GPUReconstructionConvert::RunZSEncoderCreateMeta(buffer, tpcZSonTheFlySizes.data(), *&ptrEp, &tpcZS);
636 ptrs.tpcZS = &tpcZS;
637 doInputDigits = doInputDigitsMC = mSpecConfig.processMC;
638 } else if (mSpecConfig.zsDecoder) {
639 ptrs.tpcZS = &tpcZS;
640 if (mSpecConfig.processMC) {
641 throw std::runtime_error("Cannot process MC information, none available");
642 }
643 } else if (mSpecConfig.caClusterer) {
644 doInputDigits = true;
645 doInputDigitsMC = mSpecConfig.processMC;
646 } else {
647 ptrs.clustersNative = &inputsClustersDigits->clusterIndex;
648 }
649
650 if (mTPCSectorMask != 0xFFFFFFFFF) {
651 // Clean out the unused sectors, such that if they were present by chance, they are not processed, and if the values are uninitialized, we should not crash
652 for (uint32_t i = 0; i < NSectors; i++) {
653 if (!(mTPCSectorMask & (1ul << i))) {
654 if (ptrs.tpcZS) {
655 for (uint32_t j = 0; j < GPUTrackingInOutZS::NENDPOINTS; j++) {
656 tpcZS.sector[i].zsPtr[j] = nullptr;
657 tpcZS.sector[i].nZSPtr[j] = nullptr;
658 tpcZS.sector[i].count[j] = 0;
659 }
660 }
661 }
662 }
663 }
664
665 GPUTrackingInOutDigits tpcDigitsMap;
666 GPUTPCDigitsMCInput tpcDigitsMapMC;
667 if (doInputDigits) {
668 ptrs.tpcPackedDigits = &tpcDigitsMap;
669 if (doInputDigitsMC) {
670 tpcDigitsMap.tpcDigitsMC = &tpcDigitsMapMC;
671 }
672 for (uint32_t i = 0; i < NSectors; i++) {
673 tpcDigitsMap.tpcDigits[i] = inputsClustersDigits->inputDigits[i].data();
674 tpcDigitsMap.nTPCDigits[i] = inputsClustersDigits->inputDigits[i].size();
675 if (doInputDigitsMC) {
676 tpcDigitsMapMC.v[i] = inputsClustersDigits->inputDigitsMCPtrs[i];
677 }
678 }
679 }
680
681 o2::tpc::TPCSectorHeader clusterOutputSectorHeader{0};
682 if (mClusterOutputIds.size() > 0) {
683 clusterOutputSectorHeader.sectorBits = mTPCSectorMask;
684 // subspecs [0, NSectors - 1] are used to identify sector data, we use NSectors to indicate the full TPC
685 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
686 }
687
688 // ------------------------------ Prepare stage for double-pipeline before normal output preparation ------------------------------
689
690 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
691 if (mSpecConfig.enableDoublePipeline) {
692 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
693 return;
694 }
695 }
696
697 // ------------------------------ Prepare outputs ------------------------------
698
699 GPUInterfaceOutputs outputRegions;
700 using outputDataType = char;
701 using outputBufferUninitializedVector = std::decay_t<decltype(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(Output{"", "", 0}))>;
702 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
703 std::vector<outputBufferType> outputBuffers(GPUInterfaceOutputs::count(), {std::nullopt, nullptr});
704 std::unordered_set<std::string> outputsCreated;
705
706 auto setOutputAllocator = [this, &outputBuffers, &outputRegions, &pc, &outputsCreated](const char* name, bool condition, GPUOutputControl& region, auto&& outputSpec, size_t offset = 0) {
707 if (condition) {
708 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
709 if (mConfParam->allocateOutputOnTheFly) {
710 region.allocator = [this, name, &buffer, &pc, outputSpec = std::move(outputSpec), offset, &outputsCreated](size_t size) -> void* {
711 size += offset;
712 if (mVerbosity) {
713 LOG(info) << "ALLOCATING " << size << " bytes for " << name << ": " << std::get<DataOrigin>(outputSpec).template as<std::string>() << "/" << std::get<DataDescription>(outputSpec).template as<std::string>() << "/" << std::get<2>(outputSpec);
714 }
715 std::chrono::time_point<std::chrono::high_resolution_clock> start, end;
716 if (mVerbosity) {
717 start = std::chrono::high_resolution_clock::now();
718 }
719 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), size));
720 outputsCreated.insert(name);
721 if (mVerbosity) {
722 end = std::chrono::high_resolution_clock::now();
723 std::chrono::duration<double> elapsed_seconds = end - start;
724 LOG(info) << "Allocation time for " << name << " (" << size << " bytes)"
725 << ": " << elapsed_seconds.count() << "s";
726 }
727 return (buffer.second = buffer.first->get().data()) + offset;
728 };
729 } else {
730 buffer.first.emplace(pc.outputs().make<DataAllocator::UninitializedVector<outputDataType>>(std::make_from_tuple<Output>(outputSpec), mConfParam->outputBufferSize));
731 region.ptrBase = (buffer.second = buffer.first->get().data()) + offset;
732 region.size = buffer.first->get().size() - offset;
733 outputsCreated.insert(name);
734 }
735 }
736 };
737
738 auto downSizeBuffer = [](outputBufferType& buffer, size_t size) {
739 if (!buffer.first) {
740 return;
741 }
742 if (buffer.first->get().size() < size) {
743 throw std::runtime_error("Invalid buffer size requested");
744 }
745 buffer.first->get().resize(size);
746 if (size && buffer.first->get().data() != buffer.second) {
747 throw std::runtime_error("Inconsistent buffer address after downsize");
748 }
749 };
750
751 /*auto downSizeBufferByName = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, size_t size) {
752 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
753 downSizeBuffer(buffer, size);
754 };*/
755
756 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](GPUOutputControl& region, auto span) {
757 auto& buffer = outputBuffers[outputRegions.getIndex(region)];
758 if (!buffer.first) {
759 return;
760 }
761 if (span.size() && buffer.second != (char*)span.data()) {
762 throw std::runtime_error("Buffer does not match span");
763 }
764 downSizeBuffer(buffer, span.size() * sizeof(*span.data()));
765 };
766
767 setOutputAllocator("COMPCLUSTERSFLAT", mSpecConfig.outputCompClustersFlat, outputRegions.compressedClusters, std::make_tuple(gDataOriginTPC, (DataDescription) "COMPCLUSTERSFLAT", 0));
768 setOutputAllocator("CLUSTERNATIVE", mClusterOutputIds.size() > 0, outputRegions.clustersNative, std::make_tuple(gDataOriginTPC, mSpecConfig.sendClustersPerSector ? (DataDescription) "CLUSTERNATIVETMP" : (mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE"), NSectors, clusterOutputSectorHeader), sizeof(o2::tpc::ClusterCountIndex));
769 setOutputAllocator("CLSHAREDMAP", mSpecConfig.outputSharedClusterMap, outputRegions.sharedClusterMap, std::make_tuple(gDataOriginTPC, (DataDescription) "CLSHAREDMAP", 0));
770 setOutputAllocator("TPCOCCUPANCYMAP", mSpecConfig.outputSharedClusterMap, outputRegions.tpcOccupancyMap, std::make_tuple(gDataOriginTPC, (DataDescription) "TPCOCCUPANCYMAP", 0));
771 setOutputAllocator("TRACKS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0));
772 setOutputAllocator("CLUSREFS", mSpecConfig.outputTracks, outputRegions.tpcTracksO2ClusRefs, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0));
773 setOutputAllocator("TRACKSMCLBL", mSpecConfig.outputTracks && mSpecConfig.processMC, outputRegions.tpcTracksO2Labels, std::make_tuple(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0));
774 setOutputAllocator("TRIGGERWORDS", mSpecConfig.caClusterer && mConfig->configProcessing.param.tpcTriggerHandling, outputRegions.tpcTriggerWords, std::make_tuple(gDataOriginTPC, (DataDescription) "TRIGGERWORDS", 0));
776 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs)) {
777 outputRegions.clusterLabels.allocator = [&clustersMCBuffer](size_t size) -> void* { return &clustersMCBuffer; };
778 }
779
780 // ------------------------------ Actual processing ------------------------------
781
782 if ((int32_t)(ptrs.tpcZS != nullptr) + (int32_t)(ptrs.tpcPackedDigits != nullptr && (ptrs.tpcZS == nullptr || ptrs.tpcPackedDigits->tpcDigitsMC == nullptr)) + (int32_t)(ptrs.clustersNative != nullptr) + (int32_t)(ptrs.tpcCompressedClusters != nullptr) != 1) {
783 throw std::runtime_error("Invalid input for gpu tracking");
784 }
785
786 const auto& holdData = o2::tpc::TPCTrackingDigitsPreCheck::runPrecheck(&ptrs, mConfig.get());
787
788 calibObjectStruct oldCalibObjects;
789 doCalibUpdates(pc, oldCalibObjects);
790
791 lockDecodeInput.reset();
792
793 uint32_t threadIndex;
794 if (mConfParam->dump) {
795 if (mSpecConfig.enableDoublePipeline && pipelineContext->jobSubmitted) {
796 while (pipelineContext->jobThreadIndex == -1) {
797 }
798 threadIndex = pipelineContext->jobThreadIndex;
799 } else {
800 threadIndex = 0; // TODO: Not sure if this is safe, but it is not yet known which threadIndex will pick up the enqueued job
801 }
802
803 std::string dir = "";
804 if (mConfParam->dumpFolder != "") {
805 dir = std::regex_replace(mConfParam->dumpFolder, std::regex("\\[P\\]"), std::to_string(getpid()));
806 if (mNTFs == 1) {
807 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
808 }
809 dir += "/";
810 }
811 if (mNTFs == 1) { // Must dump with first TF, since will enforce enqueued calib updates
812 mGPUReco->DumpSettings(threadIndex, dir.c_str());
813 }
814 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
815 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
816 mNTFDumps++;
817 }
818 }
819 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
820 if (mConfParam->dumpBadTFMode == 2) {
821 ptrsDump.reset(new GPUTrackingInOutPointers);
822 memcpy((void*)ptrsDump.get(), (const void*)&ptrs, sizeof(ptrs));
823 }
824
825 int32_t retVal = 0;
826 if (mSpecConfig.enableDoublePipeline) {
827 if (!pipelineContext->jobSubmitted) {
828 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(), true);
829 } else {
830 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
831 }
832 std::unique_lock lk(pipelineContext->jobFinishedMutex);
833 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
834 retVal = pipelineContext->jobReturnValue;
835 threadIndex = pipelineContext->jobThreadIndex;
836 } else {
837 // uint32_t threadIndex = pc.services().get<ThreadPool>().threadIndex;
838 threadIndex = mNextThreadIndex;
839 if (mConfig->configProcessing.doublePipeline) {
840 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
841 }
842
843 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
844 }
845 if (retVal != 0) {
846 debugTFDump = true;
847 }
848 cleanOldCalibsTPCPtrs(oldCalibObjects);
849
850 o2::utils::DebugStreamer::instance()->flush(); // flushing debug output to file
851
852 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
853 mNDebugDumps++;
854 if (mConfParam->dumpBadTFMode <= 1) {
855 std::string filename = std::string("tpc_dump_") + std::to_string(pc.services().get<const o2::framework::DeviceSpec>().inputTimesliceId) + "_" + std::to_string(mNDebugDumps) + ".dump";
856 FILE* fp = fopen(filename.c_str(), "w+b");
857 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
858 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
859 auto data = pc.inputs().get<gsl::span<char>>(ref);
860 if (mConfParam->dumpBadTFMode == 1) {
861 uint64_t size = data.size();
862 fwrite(&size, 1, sizeof(size), fp);
863 }
864 fwrite(data.data(), 1, data.size(), fp);
865 }
866 fclose(fp);
867 } else if (mConfParam->dumpBadTFMode == 2) {
868 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
869 }
870 }
871
872 if (mConfParam->dump == 2) {
873 return;
874 }
875
876 // ------------------------------ Varios postprocessing steps ------------------------------
877
878 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
880 }
881 bool createEmptyOutput = false;
882 if (retVal != 0) {
883 if (retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
884 if (mConfig->configProcessing.throttleAlarms) {
885 LOG(warning) << "GPU Reconstruction aborted with non fatal error code, ignoring";
886 } else {
887 LOG(alarm) << "GPU Reconstruction aborted with non fatal error code, ignoring";
888 }
889 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
890 } else {
891 LOG(fatal) << "GPU Reconstruction aborted with error code " << retVal << " - errors are not ignored - terminating";
892 }
893 }
894
895 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
896 if (createEmptyOutput) {
897 memset(&ptrs, 0, sizeof(ptrs));
898 for (uint32_t i = 0; i < outputRegions.count(); i++) {
899 if (outputBuffers[i].first) {
900 size_t toSize = 0;
901 if (i == outputRegions.getIndex(outputRegions.compressedClusters)) {
902 toSize = sizeof(*ptrs.tpcCompressedClusters);
903 } else if (i == outputRegions.getIndex(outputRegions.clustersNative)) {
904 toSize = sizeof(o2::tpc::ClusterCountIndex);
905 }
906 outputBuffers[i].first->get().resize(toSize);
907 outputBuffers[i].second = outputBuffers[i].first->get().data();
908 if (toSize) {
909 memset(outputBuffers[i].second, 0, toSize);
910 }
911 }
912 }
913 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
914 memset(tmpEmptyClNative.get(), 0, sizeof(*tmpEmptyClNative));
915 ptrs.clustersNative = tmpEmptyClNative.get();
916 if (mSpecConfig.processMC) {
917 MCLabelContainer cont;
918 cont.flatten_to(clustersMCBuffer.first);
919 clustersMCBuffer.second = clustersMCBuffer.first;
920 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
921 }
922 } else {
923 gsl::span<const o2::tpc::TrackTPC> spanOutputTracks = {ptrs.outputTracksTPCO2, ptrs.nOutputTracksTPCO2};
924 gsl::span<const uint32_t> spanOutputClusRefs = {ptrs.outputClusRefsTPCO2, ptrs.nOutputClusRefsTPCO2};
925 gsl::span<const o2::MCCompLabel> spanOutputTracksMCTruth = {ptrs.outputTracksTPCO2MC, ptrs.outputTracksTPCO2MC ? ptrs.nOutputTracksTPCO2 : 0};
926 if (!mConfParam->allocateOutputOnTheFly) {
927 for (uint32_t i = 0; i < outputRegions.count(); i++) {
928 if (outputRegions.asArray()[i].ptrBase) {
929 if (outputRegions.asArray()[i].size == 1) {
930 throw std::runtime_error("Preallocated buffer size exceeded");
931 }
932 outputRegions.asArray()[i].checkCurrent();
933 downSizeBuffer(outputBuffers[i], (char*)outputRegions.asArray()[i].ptrCurrent - (char*)outputBuffers[i].second);
934 }
935 }
936 }
937 downSizeBufferToSpan(outputRegions.tpcTracksO2, spanOutputTracks);
938 downSizeBufferToSpan(outputRegions.tpcTracksO2ClusRefs, spanOutputClusRefs);
939 downSizeBufferToSpan(outputRegions.tpcTracksO2Labels, spanOutputTracksMCTruth);
940
941 // if requested, tune TPC tracks
942 if (ptrs.nOutputTracksTPCO2) {
943 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.getIndex(outputRegions.tpcTracksO2)].first->get().data());
944 }
945
946 if (mClusterOutputIds.size() > 0 && (void*)ptrs.clustersNative->clustersLinear != (void*)(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second + sizeof(o2::tpc::ClusterCountIndex))) {
947 throw std::runtime_error("cluster native output ptrs out of sync"); // sanity check
948 }
949 }
950
951 if (mConfig->configWorkflow.outputs.isSet(gpudatatypes::InOutType::TPCMergedTracks)) {
952 LOG(info) << "found " << ptrs.nOutputTracksTPCO2 << " track(s)";
953 }
954
955 if (mSpecConfig.outputCompClustersRoot) {
958 }
959
960 if (mClusterOutputIds.size() > 0) {
961 o2::tpc::ClusterNativeAccess const& accessIndex = *ptrs.clustersNative;
962 if (mSpecConfig.sendClustersPerSector) {
963 // Clusters are shipped by sector, we are copying into per-sector buffers (anyway only for ROOT output)
964 for (uint32_t i = 0; i < NSectors; i++) {
965 if (mTPCSectorMask & (1ul << i)) {
967 clusterOutputSectorHeader.sectorBits = (1ul << i);
968 char* buffer = pc.outputs().make<char>({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", subspec, {clusterOutputSectorHeader}}, accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear) + sizeof(o2::tpc::ClusterCountIndex)).data();
969 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(buffer);
970 memset(outIndex, 0, sizeof(*outIndex));
971 for (int32_t j = 0; j < o2::tpc::constants::MAXGLOBALPADROW; j++) {
972 outIndex->nClusters[i][j] = accessIndex.nClusters[i][j];
973 }
974 memcpy(buffer + sizeof(*outIndex), accessIndex.clusters[i][0], accessIndex.nClustersSector[i] * sizeof(*accessIndex.clustersLinear));
975 if (mSpecConfig.processMC && accessIndex.clustersMCTruth) {
976 MCLabelContainer cont;
977 for (uint32_t j = 0; j < accessIndex.nClustersSector[i]; j++) {
978 const auto& labels = accessIndex.clustersMCTruth->getLabels(accessIndex.clusterOffset[i][0] + j);
979 for (const auto& label : labels) {
980 cont.addElement(j, label);
981 }
982 }
983 ConstMCLabelContainer contflat;
984 cont.flatten_to(contflat);
985 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, contflat);
986 }
987 }
988 }
989 } else {
990 // Clusters are shipped as single message, fill ClusterCountIndex
991 DataHeader::SubSpecificationType subspec = NSectors;
992 o2::tpc::ClusterCountIndex* outIndex = reinterpret_cast<o2::tpc::ClusterCountIndex*>(outputBuffers[outputRegions.getIndex(outputRegions.clustersNative)].second);
993 static_assert(sizeof(o2::tpc::ClusterCountIndex) == sizeof(accessIndex.nClusters));
994 memcpy(outIndex, &accessIndex.nClusters[0][0], sizeof(o2::tpc::ClusterCountIndex));
995 if (mSpecConfig.processMC && (mSpecConfig.caClusterer || mSpecConfig.useFilteredOutputSpecs) && accessIndex.clustersMCTruth) {
996 pc.outputs().snapshot({gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), subspec, {clusterOutputSectorHeader}}, clustersMCBuffer.first);
997 }
998 }
999 }
1000 if (mSpecConfig.outputQA) {
1001 TObjArray out;
1002 bool sendQAOutput = !createEmptyOutput && outputRegions.qa.newQAHistsCreated;
1003 auto getoutput = [sendQAOutput](auto ptr) { return sendQAOutput && ptr ? *ptr : std::decay_t<decltype(*ptr)>(); };
1004 std::vector<TH1F> copy1 = getoutput(outputRegions.qa.hist1); // Internally, this will also be used as output, so we need a non-const copy
1005 std::vector<TH2F> copy2 = getoutput(outputRegions.qa.hist2);
1006 std::vector<TH1D> copy3 = getoutput(outputRegions.qa.hist3);
1007 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.qa.hist4);
1008 if (sendQAOutput) {
1009 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1010 }
1011 pc.outputs().snapshot({gDataOriginTPC, "TRACKINGQA", 0}, out);
1012 if (sendQAOutput) {
1013 mQA->cleanup();
1014 }
1015 }
1016 if (mSpecConfig.outputErrorQA) {
1017 pc.outputs().snapshot({gDataOriginGPU, "ERRORQA", 0}, mErrorQA);
1018 mErrorQA.clear(); // FIXME: This is a race condition once we run multi-threaded!
1019 }
1020 if (mSpecConfig.outputSharedClusterMap && !outputsCreated.contains("TPCOCCUPANCYMAP")) {
1022 }
1023 if (mSpecConfig.tpcTriggerHandling && !outputsCreated.contains("TRIGGERWORDS")) {
1025 }
1026 mTimer->Stop();
1027 LOG(info) << "GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput << " s (cpu), " << mTimer->RealTime() - realt << " s (wall)";
1028}
1029
1030void GPURecoWorkflowSpec::doCalibUpdates(o2::framework::ProcessingContext& pc, calibObjectStruct& oldCalibObjects)
1031{
1032 GPUCalibObjectsConst newCalibObjects;
1033 GPUNewCalibValues newCalibValues;
1034 // check for updates of TPC calibration objects
1035 bool needCalibUpdate = false;
1036 if (mGRPGeomUpdated) {
1037 mGRPGeomUpdated = false;
1038 needCalibUpdate = true;
1039
1040 if (mSpecConfig.runITSTracking && !mITSGeometryCreated) {
1043 mITSGeometryCreated = true;
1044 }
1045
1046 if (mAutoSolenoidBz) {
1047 newCalibValues.newSolenoidField = true;
1048 newCalibValues.solenoidField = mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
1049 // Propagator::Instance()->setBz(newCalibValues.solenoidField); // Take value from o2::Propagator::UpdateField from GRPGeomHelper
1050 LOG(info) << "Updating solenoid field " << newCalibValues.solenoidField;
1051 }
1052 if (mAutoContinuousMaxTimeBin) {
1053 newCalibValues.newContinuousMaxTimeBin = true;
1054 newCalibValues.continuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(mTFSettings->nHBFPerTF);
1055 LOG(info) << "Updating max time bin " << newCalibValues.continuousMaxTimeBin << " (" << mTFSettings->nHBFPerTF << " orbits)";
1056 }
1057
1058 if (!mPropagatorInstanceCreated) {
1059 newCalibObjects.o2Propagator = mConfig->configCalib.o2Propagator = Propagator::Instance();
1060 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1061 mGPUReco->UseGPUPolynomialFieldInPropagator(Propagator::Instance());
1062 }
1063 mPropagatorInstanceCreated = true;
1064 }
1065
1066 if (!mMatLUTCreated) {
1067 if (mConfParam->matLUTFile.size() == 0) {
1068 newCalibObjects.matLUT = GRPGeomHelper::instance().getMatLUT();
1069 LOG(info) << "Loaded material budget lookup table";
1070 }
1071 mMatLUTCreated = true;
1072 }
1073 if (mSpecConfig.readTRDtracklets) {
1074 if (!mTRDGeometryCreated) {
1075 auto gm = o2::trd::Geometry::instance();
1076 gm->createPadPlaneArray();
1077 gm->createClusterMatrixArray();
1078 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1079 newCalibObjects.trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1080 LOG(info) << "Loaded TRD geometry";
1081 mTRDGeometryCreated = true;
1082 }
1083 if (!mTRDRecoParamCreated) {
1084 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1085 newCalibObjects.trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1086 mTRDRecoParamCreated = true;
1087 }
1088 }
1089 }
1090 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1091 if (mSpecConfig.runITSTracking) {
1092 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1093 }
1094 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1095 newCalibValues.newTPCTimeBinCut = true;
1096 newCalibValues.tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1097 needCalibUpdate = true;
1098 }
1099 if (mSpecConfig.nnLoadFromCCDB) {
1100 auto dumpToFile = [](const char* buffer, std::size_t validSize, const std::string& path) {
1101 std::ofstream out(path, std::ios::binary | std::ios::trunc);
1102 if (!out.is_open()) {
1103 throw std::runtime_error("Failed to open output file: " + path);
1104 }
1105
1106 out.write(buffer, static_cast<std::streamsize>(validSize));
1107 if (!out) {
1108 throw std::runtime_error("Failed while writing data to: " + path);
1109 }
1110 };
1111 for (int i = 0; i < 3; i++) {
1112 newCalibObjects.nnClusterizerNetworks[i] = mConfig->configCalib.nnClusterizerNetworks[i];
1113 if (mSpecConfig.nnDumpToFile && newCalibObjects.nnClusterizerNetworks[i]) {
1114 std::string path = "tpc_nn_clusterizer_" + std::to_string(i) + ".onnx";
1115 dumpToFile(newCalibObjects.nnClusterizerNetworks[i]->getONNXModel(), newCalibObjects.nnClusterizerNetworks[i]->getONNXModelSize(), path);
1116 LOG(info) << "Dumped TPC clusterizer NN " << i << " to file " << path;
1117 }
1118 }
1119 }
1120 if (needCalibUpdate) {
1121 LOG(info) << "Updating GPUReconstruction calibration objects";
1122 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1123 }
1124}
1125
1127{
1128 Options opts;
1129 if (mSpecConfig.enableDoublePipeline) {
1130 bool send = mSpecConfig.enableDoublePipeline == 2;
1131 char* o2jobid = getenv("O2JOBID");
1132 char* numaid = getenv("NUMAID");
1133 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1134 std::string chan = std::string("name=gpu-prepare-channel,type=") + (send ? "push" : "pull") + ",method=" + (send ? "connect" : "bind") + ",address=ipc://@gpu-prepare-channel-" + std::to_string(chanid) + "-{timeslice0},transport=shmem,rateLogging=0";
1135 opts.emplace_back(o2::framework::ConfigParamSpec{"channel-config", o2::framework::VariantType::String, chan, {"Out-of-band channel config"}});
1136 }
1137 if (mSpecConfig.enableDoublePipeline == 2) {
1138 return opts;
1139 }
1140 if (mSpecConfig.outputTracks) {
1142 }
1143 return opts;
1144}
1145
1147{
1148 Inputs inputs;
1149 if (mSpecConfig.zsDecoder) {
1150 // All ZS raw data is published with subspec 0 by the o2-raw-file-reader-workflow and DataDistribution
1151 // creates subspec fom CRU and endpoint id, we create one single input route subscribing to all TPC/RAWDATA
1152 inputs.emplace_back(InputSpec{"zsraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
1153 if (mSpecConfig.askDISTSTF) {
1154 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1155 }
1156 }
1157 if (mSpecConfig.enableDoublePipeline == 2) {
1158 if (!mSpecConfig.zsDecoder) {
1159 LOG(fatal) << "Double pipeline mode can only work with zsraw input";
1160 }
1161 return inputs;
1162 } else if (mSpecConfig.enableDoublePipeline == 1) {
1163 inputs.emplace_back("pipelineprepare", gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1164 }
1165 if (mSpecConfig.enableDoublePipeline != 2 && (mSpecConfig.outputTracks || mSpecConfig.caClusterer)) {
1166 // calibration objects for TPC clusterization
1167 inputs.emplace_back("tpcgain", gDataOriginTPC, "PADGAINFULL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainFull)));
1168 inputs.emplace_back("tpcaltrosync", gDataOriginTPC, "ALTROSYNCSIGNAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::AltroSyncSignal)));
1169 }
1170 if (mSpecConfig.enableDoublePipeline != 2 && mSpecConfig.outputTracks) {
1171 // calibration objects for TPC tracking
1172 const auto mapSources = mSpecConfig.tpcDeadMapSources;
1173 if (mapSources != 0) {
1174 tpc::SourcesDeadMap sources((mapSources > -1) ? static_cast<tpc::SourcesDeadMap>(mapSources) : tpc::SourcesDeadMap::All);
1176 inputs.emplace_back("tpcidcpadflags", gDataOriginTPC, "IDCPADFLAGS", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalIDCPadStatusMapA), {}, 1)); // time-dependent
1177 }
1179 inputs.emplace_back("tpcruninfo", gDataOriginTPC, "TPCRUNINFO", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::ConfigRunInfo)));
1180 }
1181 }
1182
1183 inputs.emplace_back("tpcgainresidual", gDataOriginTPC, "PADGAINRESIDUAL", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalPadGainResidual), {}, 1)); // time-dependent
1184 if (mSpecConfig.tpcUseMCTimeGain) {
1185 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGainMC), {}, 1)); // time-dependent
1186 } else {
1187 inputs.emplace_back("tpctimegain", gDataOriginTPC, "TIMEGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTimeGain), {}, 1)); // time-dependent
1188 }
1189 inputs.emplace_back("tpctopologygain", gDataOriginTPC, "TOPOLOGYGAIN", 0, Lifetime::Condition, ccdbParamSpec(o2::tpc::CDBTypeMap.at(o2::tpc::CDBType::CalTopologyGain)));
1190 inputs.emplace_back("tpcthreshold", gDataOriginTPC, "PADTHRESHOLD", 0, Lifetime::Condition, ccdbParamSpec("TPC/Config/FEEPad"));
1192 Options optsDummy;
1193 o2::tpc::CorrectionMapsLoaderGloOpts gloOpts{mSpecConfig.lumiScaleType, mSpecConfig.lumiScaleMode, mSpecConfig.enableMShape, mSpecConfig.enableCTPLumi};
1194 mCalibObjects.mFastTransformHelper->requestCCDBInputs(inputs, optsDummy, gloOpts); // option filled here is lost
1195 }
1196 if (mSpecConfig.decompressTPC) {
1197 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, mSpecConfig.decompressTPCFromROOT ? o2::header::DataDescription("COMPCLUSTERS") : o2::header::DataDescription("COMPCLUSTERSFLAT")}, Lifetime::Timeframe});
1198 } else if (mSpecConfig.caClusterer) {
1199 // We accept digits and MC labels also if we run on ZS Raw data, since they are needed for MC label propagation
1200 if ((!mSpecConfig.zsOnTheFly || mSpecConfig.processMC) && !mSpecConfig.zsDecoder) {
1201 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITS"}, Lifetime::Timeframe});
1202 mPolicyData->emplace_back(o2::framework::InputSpec{"digits", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"}});
1203 }
1204 } else if (mSpecConfig.runTPCTracking) {
1205 inputs.emplace_back(InputSpec{"input", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERNATIVE"}, Lifetime::Timeframe});
1206 mPolicyData->emplace_back(o2::framework::InputSpec{"clusters", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}});
1207 }
1208 if (mSpecConfig.processMC) {
1209 if (mSpecConfig.caClusterer) {
1210 if (!mSpecConfig.zsDecoder) {
1211 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "DIGITSMCTR"}, Lifetime::Timeframe});
1212 mPolicyData->emplace_back(o2::framework::InputSpec{"digitsmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITSMCTR"}});
1213 }
1214 } else if (mSpecConfig.runTPCTracking) {
1215 inputs.emplace_back(InputSpec{"mclblin", ConcreteDataTypeMatcher{gDataOriginTPC, "CLNATIVEMCLBL"}, Lifetime::Timeframe});
1216 mPolicyData->emplace_back(o2::framework::InputSpec{"clustersmc", o2::framework::ConcreteDataTypeMatcher{"TPC", "CLNATIVEMCLBL"}});
1217 }
1218 }
1219
1220 if (mSpecConfig.zsOnTheFly) {
1221 inputs.emplace_back(InputSpec{"zsinput", ConcreteDataTypeMatcher{"TPC", "TPCZS"}, Lifetime::Timeframe});
1222 inputs.emplace_back(InputSpec{"zsinputsizes", ConcreteDataTypeMatcher{"TPC", "ZSSIZES"}, Lifetime::Timeframe});
1223 }
1224 if (mSpecConfig.readTRDtracklets) {
1225 inputs.emplace_back("trdctracklets", o2::header::gDataOriginTRD, "CTRACKLETS", 0, Lifetime::Timeframe);
1226 inputs.emplace_back("trdtracklets", o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe);
1227 inputs.emplace_back("trdtriggerrec", o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe);
1228 inputs.emplace_back("trdtrigrecmask", o2::header::gDataOriginTRD, "TRIGRECMASK", 0, Lifetime::Timeframe);
1229 }
1230
1231 if (mSpecConfig.runITSTracking) {
1232 inputs.emplace_back("compClusters", "ITS", "COMPCLUSTERS", 0, Lifetime::Timeframe);
1233 inputs.emplace_back("patterns", "ITS", "PATTERNS", 0, Lifetime::Timeframe);
1234 inputs.emplace_back("ROframes", "ITS", "CLUSTERSROF", 0, Lifetime::Timeframe);
1235 if (mSpecConfig.itsTriggerType == 1) {
1236 inputs.emplace_back("phystrig", "ITS", "PHYSTRIG", 0, Lifetime::Timeframe);
1237 } else if (mSpecConfig.itsTriggerType == 2) {
1238 inputs.emplace_back("phystrig", "TRD", "TRKTRGRD", 0, Lifetime::Timeframe);
1239 }
1240 if (mSpecConfig.enableDoublePipeline != 2) {
1241 if (mSpecConfig.isITS3) {
1242 inputs.emplace_back("cldict", "IT3", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("IT3/Calib/ClusterDictionary"));
1243 inputs.emplace_back("alppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1244 } else {
1245 inputs.emplace_back("itscldict", "ITS", "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec("ITS/Calib/ClusterDictionary"));
1246 inputs.emplace_back("itsalppar", "ITS", "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec("ITS/Config/AlpideParam"));
1247 }
1248 if (mSpecConfig.itsOverrBeamEst) {
1249 inputs.emplace_back("meanvtx", "GLO", "MEANVERTEX", 0, Lifetime::Condition, ccdbParamSpec("GLO/Calib/MeanVertex", {}, 1));
1250 }
1251 }
1252 if (mSpecConfig.processMC) {
1253 inputs.emplace_back("itsmclabels", "ITS", "CLUSTERSMCTR", 0, Lifetime::Timeframe);
1254 inputs.emplace_back("ITSMC2ROframes", "ITS", "CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1255 }
1256 }
1257
1258 // NN clusterizer
1259 *mConfParam = mConfig->ReadConfigurableParam();
1260 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1261
1262 LOG(info) << "(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1263 mSpecConfig.nnLoadFromCCDB = true;
1264 mSpecConfig.nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1265 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1266
1267 std::map<std::string, std::string> metadata;
1268 metadata["inputDType"] = nnClusterizerSettings.nnInferenceInputDType; // FP16 or FP32
1269 metadata["outputDType"] = nnClusterizerSettings.nnInferenceOutputDType; // FP16 or FP32
1270 metadata["nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum; // 0, 1 -> Only for regression model
1271 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType; // FC, CNN
1272 metadata["nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate; // in kHz
1273 metadata["nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType; // pp, pPb, PbPb
1274
1275 auto convert_map_to_metadata = [](const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1276 for (const auto& [key, value] : inputMap) {
1277 if (value != "") {
1278 outputMetadata.push_back({key, value});
1279 }
1280 }
1281 };
1282
1283 mSpecConfig.nnEvalMode = o2::utils::Str::tokenize(nnClusterizerSettings.nnEvalMode, ':');
1284 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1285
1286 if (mConfParam->printSettings) {
1287 auto printSettings = [](const std::map<std::string, std::string>& settings) {
1288 LOG(info) << "(NN CLUS) NN Clusterizer CCDB settings:";
1289 for (const auto& [key, value] : settings) {
1290 LOG(info) << " " << key << " : " << value;
1291 }
1292 };
1293 printSettings(metadata);
1294 }
1295
1296 if (mSpecConfig.nnEvalMode[0] == "c1") {
1297 metadata["nnCCDBEvalType"] = "classification_c1";
1298 convert_map_to_metadata(metadata, ccdb_metadata);
1299 inputs.emplace_back("nn_classification_c1", gDataOriginTPC, "NNCLUSTERIZER_C1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1300 } else if (mSpecConfig.nnEvalMode[0] == "c2") {
1301 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 metadata["nnCCDBEvalType"] = "classification_c2";
1303 convert_map_to_metadata(metadata, ccdb_metadata);
1304 inputs.emplace_back("nn_classification_c2", gDataOriginTPC, "NNCLUSTERIZER_C2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1305 }
1306
1307 metadata["nnCCDBEvalType"] = "regression_c1";
1308 metadata["nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1309 convert_map_to_metadata(metadata, ccdb_metadata);
1310 inputs.emplace_back("nn_regression_c1", gDataOriginTPC, "NNCLUSTERIZER_R1", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1311
1312 if (mSpecConfig.nnEvalMode[1] == "r2") {
1313 metadata["nnCCDBEvalType"] = "regression_c2";
1314 convert_map_to_metadata(metadata, ccdb_metadata);
1315 inputs.emplace_back("nn_regression_c2", gDataOriginTPC, "NNCLUSTERIZER_R2", 0, Lifetime::Condition, ccdbParamSpec(nnClusterizerSettings.nnCCDBPath + "/" + metadata["nnCCDBEvalType"], ccdb_metadata, 0));
1316 }
1317 }
1318
1319 return inputs;
1320};
1321
1323{
1324 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
1325 std::vector<OutputSpec> outputSpecs;
1326 if (mSpecConfig.enableDoublePipeline == 2) {
1327 outputSpecs.emplace_back(gDataOriginGPU, "PIPELINEPREPARE", 0, Lifetime::Timeframe);
1328 return outputSpecs;
1329 }
1330 if (mSpecConfig.outputTracks) {
1331 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSF" : (DataDescription) "TRACKS", 0, Lifetime::Timeframe);
1332 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSREFSF" : (DataDescription) "CLUSREFS", 0, Lifetime::Timeframe);
1333 }
1334 if (mSpecConfig.processMC && mSpecConfig.outputTracks) {
1335 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "TRACKSMCLBLF" : (DataDescription) "TRACKSMCLBL", 0, Lifetime::Timeframe);
1336 }
1337 if (mSpecConfig.outputCompClustersRoot) {
1338 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERS", 0, Lifetime::Timeframe);
1339 }
1340 if (mSpecConfig.outputCompClustersFlat) {
1341 outputSpecs.emplace_back(gDataOriginTPC, "COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1342 }
1343 if (mSpecConfig.outputCAClusters) {
1344 for (auto const& sector : mTPCSectors) {
1345 mClusterOutputIds.emplace_back(sector);
1346 }
1347 if (mSpecConfig.sendClustersPerSector) {
1348 outputSpecs.emplace_back(gDataOriginTPC, "CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe); // Dummy buffer the TPC tracker writes the inital linear clusters to
1349 for (const auto sector : mTPCSectors) {
1350 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", sector, Lifetime::Timeframe);
1351 }
1352 } else {
1353 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? (DataDescription) "CLUSTERNATIVEF" : (DataDescription) "CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1354 }
1355 if (mSpecConfig.processMC) {
1356 if (mSpecConfig.sendClustersPerSector) {
1357 for (const auto sector : mTPCSectors) {
1358 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), sector, Lifetime::Timeframe);
1359 }
1360 } else {
1361 outputSpecs.emplace_back(gDataOriginTPC, mSpecConfig.useFilteredOutputSpecs ? DataDescription("CLNATIVEMCLBLF") : DataDescription("CLNATIVEMCLBL"), NSectors, Lifetime::Timeframe);
1362 }
1363 }
1364 }
1365 if (mSpecConfig.outputSharedClusterMap) {
1366 outputSpecs.emplace_back(gDataOriginTPC, "CLSHAREDMAP", 0, Lifetime::Timeframe);
1367 outputSpecs.emplace_back(gDataOriginTPC, "TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1368 }
1369 if (mSpecConfig.tpcTriggerHandling) {
1370 outputSpecs.emplace_back(gDataOriginTPC, "TRIGGERWORDS", 0, Lifetime::Timeframe);
1371 }
1372 if (mSpecConfig.outputQA) {
1373 outputSpecs.emplace_back(gDataOriginTPC, "TRACKINGQA", 0, Lifetime::Timeframe);
1374 }
1375 if (mSpecConfig.outputErrorQA) {
1376 outputSpecs.emplace_back(gDataOriginGPU, "ERRORQA", 0, Lifetime::Timeframe);
1377 }
1378
1379 if (mSpecConfig.runITSTracking) {
1380 outputSpecs.emplace_back(gDataOriginITS, "TRACKS", 0, Lifetime::Timeframe);
1381 outputSpecs.emplace_back(gDataOriginITS, "TRACKCLSID", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackROF", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(gDataOriginITS, "VERTICES", 0, Lifetime::Timeframe);
1384 outputSpecs.emplace_back(gDataOriginITS, "VERTICESROF", 0, Lifetime::Timeframe);
1385 outputSpecs.emplace_back(gDataOriginITS, "IRFRAMES", 0, Lifetime::Timeframe);
1386
1387 if (mSpecConfig.processMC) {
1388 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCTR", 0, Lifetime::Timeframe);
1389 outputSpecs.emplace_back(gDataOriginITS, "VERTICESMCPUR", 0, Lifetime::Timeframe);
1390 outputSpecs.emplace_back(gDataOriginITS, "TRACKSMCTR", 0, Lifetime::Timeframe);
1391 outputSpecs.emplace_back(gDataOriginITS, "ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1392 }
1393 }
1394
1395 return outputSpecs;
1396};
1397
1399{
1400 ExitPipeline();
1401 mQA.reset(nullptr);
1402 mDisplayFrontend.reset(nullptr);
1403 mGPUReco.reset(nullptr);
1404}
1405
1406} // namespace o2::gpu
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Wrapper container for different reconstructed object types.
Definition of the TPC Digit.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Definition of class for writing debug informations.
Definition of the GeometryManager class.
int32_t i
int32_t retVal
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
A helper class to iteratate over all parts of all input routes.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
uint32_t j
Definition RawData.h:0
Struct for input data required by TRD tracking workflow.
Type wrappers for enfording a specific serialization method.
class to create TPC fast transformation
Definition of TPCFastTransform class.
Wrapper class for TPC CA Tracker algorithm.
TBranch * ptr
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
StringRef key
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
static std::string getConfigOutputFileName(const std::string &procName, const std::string &confName="", bool json=true)
Definition NameConf.cxx:115
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
Definition Propagator.h:178
static void write(std::string const &filename, std::string const &keyOnly="")
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
A read-only version of MCTruthContainer allowing for storage optimisation.
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr std::string_view NONE
keywork for no sources
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
static constexpr ID TPC
Definition DetID.h:64
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
Definition Sector.h:44
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
Definition Geometry.h:33
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLint GLuint mask
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginGPU
Definition DataHeader.h:593
constexpr int NSectors
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
auto get(const std::byte *buffer, size_t=0)
Definition DataHeader.h:454
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int MAXGLOBALPADROW
Definition Constants.h:34
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:96
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TGraphAsymmErrors > * hist4
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
std::array< const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > *, o2::tpc::constants::MAXSECTOR > v
const o2::tpc::Digit * tpcDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
uint32_t SubSpecificationType
Definition DataHeader.h:621
static constexpr int T2L
Definition Cartesian.h:55
static constexpr int T2GRot
Definition Cartesian.h:57
static constexpr int T2G
Definition Cartesian.h:56
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"