84#include <TStopwatch.h>
89#include <TGraphAsymmErrors.h>
101#include <unordered_set>
113GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
116 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
120 mConfParam.reset(
new GPUSettingsO2);
122 mTimer.reset(
new TStopwatch);
126 *gPolicyOrder = &mPolicyOrder;
138 mConfig->configGRP.solenoidBzNominalGPU = 0;
139 mTFSettings->hasSimStartOrbit = 1;
141 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
143 *mConfParam = mConfig->ReadConfigurableParam();
145 if (mConfParam->display) {
147 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
148 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
149 LOG(info) <<
"Event display enabled";
151 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
155 mConfig->configProcessing.doublePipeline = 1;
158 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
159 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
160 if (mAutoContinuousMaxTimeBin) {
163 if (mConfig->configProcessing.deviceNum == -2) {
166 mConfig->configProcessing.deviceNum = myId;
167 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
169 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
172 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
174 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
175 throw std::runtime_error(
"Need MC information to create QA plots");
178 mConfig->configQA.noMC =
true;
180 mConfig->configQA.shipToQC =
true;
181 if (!mConfig->configProcessing.runQA) {
182 mConfig->configQA.enableLocalOutput =
false;
183 mQATaskMask = (mSpecConfig.
processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
184 mConfig->configProcessing.runQA = -mQATaskMask;
187 mConfig->configInterface.outputToExternalBuffers =
true;
188 if (mConfParam->synchronousProcessing) {
189 mConfig->configReconstruction.useMatLUT =
false;
191 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
192 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
201 mConfig->configWorkflow.steps.setBits(
GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
219 if (mTPCSectorMask != 0xFFFFFFFFF) {
220 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
233 mConfig->configProcessing.outputSharedClusterMap =
true;
236 mConfig->configProcessing.createO2Output = 0;
240 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
241 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
242 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
243 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
249 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
253 mGPUReco = std::make_unique<GPUO2Interface>();
256 initFunctionTPCCalib(ic);
258 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
259 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
260 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
261 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
262 if (mConfig->configCalib.fastTransform ==
nullptr) {
263 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
266 if (mConfParam->matLUTFile.size()) {
267 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
269 if (mConfig->configCalib.matLUT ==
nullptr) {
270 LOGF(fatal,
"Error loading matlut file");
273 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
277 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
278 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
281 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
282 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
285 mConfig->configProcessing.printSettings =
true;
286 if (mConfParam->printSettings > 1) {
287 mConfig->PrintParam();
292 if (mGPUReco->Initialize(config) != 0) {
293 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
296 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
299 mGPUReco->setErrorCodeOutput(&mErrorQA);
310 if (mConfParam->dump >= 2) {
311 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
316 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
317 if (info.size == 0) {
321 mRegionInfos.emplace_back(info);
326 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
330 if (mConfParam->mutexMemReg) {
331 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
332 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
334 throw std::runtime_error(
"Error opening memlock mutex lock file");
337 if (lockf(fd, F_LOCK, 0)) {
338 throw std::runtime_error(
"Error locking memlock mutex file");
341 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
342 if (mConfParam->benchmarkMemoryRegistration) {
343 start = std::chrono::high_resolution_clock::now();
345 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
346 throw std::runtime_error(
"Error registering memory for GPU");
348 if (mConfParam->benchmarkMemoryRegistration) {
349 end = std::chrono::high_resolution_clock::now();
350 std::chrono::duration<double> elapsed_seconds =
end -
start;
351 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
353 if (mConfParam->mutexMemReg) {
354 if (lockf(fd, F_ULOCK, 0)) {
355 throw std::runtime_error(
"Error unlocking memlock mutex file");
367 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
368 handlePipelineStop();
373 handlePipelineEndOfStream(ec);
379 finaliseCCDBTPC(matcher, obj);
381 finaliseCCDBITS(matcher, obj);
385 mGRPGeomUpdated =
true;
390template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
391void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
402 tpcZSmeta.Pointers[
i][
j].clear();
403 tpcZSmeta.Sizes[
i][
j].clear();
408 tpcZSonTheFlySizes = {0};
411 bool recv =
false, recvsizes =
false;
414 throw std::runtime_error(
"Received multiple ZSSIZES data");
416 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
423 throw std::runtime_error(
"Received multiple TPCZS data");
425 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
428 if (!recv || !recvsizes) {
429 throw std::runtime_error(
"TPC ZS on the fly data not received");
434 uint32_t pageSector = 0;
435 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
436 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
437 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
439 if (mVerbosity >= 1) {
440 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
446 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
447 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
449 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
450 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
451 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
452 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
453 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
455 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
456 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
457 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
459 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
460 if (checkForZSData(
ptr, subSpec)) {
461 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
462 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
463 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
464 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
469 static uint32_t nErrors = 0;
471 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
472 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
476 int32_t totalCount = 0;
479 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
480 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
481 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
482 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
483 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
484 totalCount += tpcZSmeta.Pointers[
i][
j].size();
490 compClustersFlatDummy.setForward(&compClustersDummy);
491 pCompClustersFlat = &compClustersFlatDummy;
495 if (pCompClustersFlat ==
nullptr) {
502 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
510 if (mConfParam->dump < 2) {
511 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
514 retVal = runITSTracking(*pc);
519 mGPUReco->Clear(
false, threadIndex);
524void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
526 if (mOldCalibObjects.size() > 0) {
527 mOldCalibObjects.pop();
529 mOldCalibObjects.emplace(std::move(oldCalibObjects));
537 auto cput = mTimer->CpuTime();
538 auto realt = mTimer->RealTime();
539 mTimer->Start(
false);
542 std::vector<gsl::span<const char>>
inputs;
550 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
551 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
552 std::unique_ptr<char[]> tmpEmptyCompClusters;
554 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
555 bool debugTFDump =
false;
558 getWorkflowTPCInput_mc =
true;
561 getWorkflowTPCInput_clusters =
true;
564 getWorkflowTPCInput_digits =
true;
569 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
577 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
582 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
583 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
586 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
587 mTFSettings->hasTfStartOrbit = 1;
588 mTFSettings->hasNHBFPerTF = 1;
590 mTFSettings->hasRunStartOrbit = 0;
595 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
597 if (mConfParam->checkFirstTfOrbit) {
598 static uint32_t lastFirstTFOrbit = -1;
599 static uint32_t lastTFCounter = -1;
600 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
601 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
602 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
603 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
604 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
607 lastFirstTFOrbit = tinfo.firstTForbit;
608 lastTFCounter = tinfo.tfCounter;
621 void* ptrEp[NSectors * NEndpoints] = {};
622 bool doInputDigits =
false, doInputDigitsMC =
false;
626 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
629 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
633 throw std::runtime_error(
"Cannot process MC information, none available");
636 doInputDigits =
true;
642 if (mTPCSectorMask != 0xFFFFFFFFF) {
644 for (uint32_t
i = 0;
i < NSectors;
i++) {
645 if (!(mTPCSectorMask & (1ul <<
i))) {
661 if (doInputDigitsMC) {
664 for (uint32_t
i = 0;
i < NSectors;
i++) {
665 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
666 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
667 if (doInputDigitsMC) {
668 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
674 if (mClusterOutputIds.size() > 0) {
675 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
677 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
682 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
684 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
692 using outputDataType =
char;
694 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
696 std::unordered_set<std::string> outputsCreated;
698 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
701 if (mConfParam->allocateOutputOnTheFly) {
702 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
705 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
707 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
709 start = std::chrono::high_resolution_clock::now();
712 outputsCreated.insert(
name);
714 end = std::chrono::high_resolution_clock::now();
715 std::chrono::duration<double> elapsed_seconds =
end -
start;
716 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
717 <<
": " << elapsed_seconds.count() <<
"s";
725 outputsCreated.insert(
name);
730 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
735 throw std::runtime_error(
"Invalid buffer size requested");
739 throw std::runtime_error(
"Inconsistent buffer address after downsize");
748 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
753 if (span.size() &&
buffer.second != (
char*)span.data()) {
754 throw std::runtime_error(
"Buffer does not match span");
756 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
775 throw std::runtime_error(
"Invalid input for gpu tracking");
780 calibObjectStruct oldCalibObjects;
781 doCalibUpdates(pc, oldCalibObjects);
783 lockDecodeInput.reset();
785 uint32_t threadIndex;
786 if (mConfParam->dump) {
788 while (pipelineContext->jobThreadIndex == -1) {
790 threadIndex = pipelineContext->jobThreadIndex;
795 std::string dir =
"";
796 if (mConfParam->dumpFolder !=
"") {
797 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
799 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
804 mGPUReco->DumpSettings(threadIndex, dir.c_str());
806 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
807 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
811 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
812 if (mConfParam->dumpBadTFMode == 2) {
814 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
819 if (!pipelineContext->jobSubmitted) {
820 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
822 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
824 std::unique_lock lk(pipelineContext->jobFinishedMutex);
825 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
826 retVal = pipelineContext->jobReturnValue;
827 threadIndex = pipelineContext->jobThreadIndex;
830 threadIndex = mNextThreadIndex;
831 if (mConfig->configProcessing.doublePipeline) {
832 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
835 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
840 cleanOldCalibsTPCPtrs(oldCalibObjects);
842 o2::utils::DebugStreamer::instance()->flush();
844 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
846 if (mConfParam->dumpBadTFMode <= 1) {
848 FILE* fp = fopen(
filename.c_str(),
"w+b");
852 if (mConfParam->dumpBadTFMode == 1) {
856 fwrite(
data.data(), 1,
data.size(), fp);
859 }
else if (mConfParam->dumpBadTFMode == 2) {
860 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
864 if (mConfParam->dump == 2) {
870 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
873 bool createEmptyOutput =
false;
875 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
876 if (mConfig->configProcessing.throttleAlarms) {
877 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
879 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
881 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
883 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
887 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
888 if (createEmptyOutput) {
889 memset(&ptrs, 0,
sizeof(ptrs));
890 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
891 if (outputBuffers[
i].
first) {
898 outputBuffers[
i].first->get().resize(toSize);
899 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
901 memset(outputBuffers[
i].second, 0, toSize);
905 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
906 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
911 clustersMCBuffer.second = clustersMCBuffer.first;
912 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
918 if (!mConfParam->allocateOutputOnTheFly) {
919 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
922 throw std::runtime_error(
"Preallocated buffer size exceeded");
925 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
929 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
935 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
939 throw std::runtime_error(
"cluster native output ptrs out of sync");
952 if (mClusterOutputIds.size() > 0) {
956 for (uint32_t
i = 0;
i < NSectors;
i++) {
957 if (mTPCSectorMask & (1ul <<
i)) {
959 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
962 memset(outIndex, 0,
sizeof(*outIndex));
995 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
996 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
997 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
998 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
999 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1001 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1019 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1027 bool needCalibUpdate =
false;
1028 if (mGRPGeomUpdated) {
1029 mGRPGeomUpdated =
false;
1030 needCalibUpdate =
true;
1035 mITSGeometryCreated =
true;
1038 if (mAutoSolenoidBz) {
1044 if (mAutoContinuousMaxTimeBin) {
1047 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1050 if (!mPropagatorInstanceCreated) {
1052 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1055 mPropagatorInstanceCreated =
true;
1058 if (!mMatLUTCreated) {
1059 if (mConfParam->matLUTFile.size() == 0) {
1061 LOG(info) <<
"Loaded material budget lookup table";
1063 mMatLUTCreated =
true;
1067 gm->createPadPlaneArray();
1068 gm->createClusterMatrixArray();
1069 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1070 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1071 LOG(info) <<
"Loaded TRD geometry";
1072 mTRDGeometryCreated =
true;
1075 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1077 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1079 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1081 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1082 needCalibUpdate =
true;
1086 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1087 if (!out.is_open()) {
1088 throw std::runtime_error(
"Failed to open output file: " +
path);
1091 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1093 throw std::runtime_error(
"Failed while writing data to: " +
path);
1096 for (
int i = 0;
i < 3;
i++) {
1101 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1105 if (needCalibUpdate) {
1106 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1107 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1116 char* o2jobid = getenv(
"O2JOBID");
1117 char* numaid = getenv(
"NUMAID");
1118 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1119 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1139 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1144 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1148 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1158 if (mapSources != 0) {
1179 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1217 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1218 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1219 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1221 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1223 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1226 if (mSpecConfig.
isITS3) {
1227 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1228 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1230 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1231 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1234 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1238 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1239 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1244 *mConfParam = mConfig->ReadConfigurableParam();
1245 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1247 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1249 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1250 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1252 std::map<std::string, std::string> metadata;
1253 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1254 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1255 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1256 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1257 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1258 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1260 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1261 for (
const auto& [
key,
value] : inputMap) {
1263 outputMetadata.push_back({
key,
value});
1269 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1271 if (mConfParam->printSettings) {
1272 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1273 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1274 for (
const auto& [
key,
value] : settings) {
1278 printSettings(metadata);
1282 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1283 convert_map_to_metadata(metadata, ccdb_metadata);
1284 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1285 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1286 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1287 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1288 convert_map_to_metadata(metadata, ccdb_metadata);
1289 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1292 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1293 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1294 convert_map_to_metadata(metadata, ccdb_metadata);
1295 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1298 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1299 convert_map_to_metadata(metadata, ccdb_metadata);
1300 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1310 std::vector<OutputSpec> outputSpecs;
1312 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1323 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1326 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1329 for (
auto const& sector : mTPCSectors) {
1330 mClusterOutputIds.emplace_back(sector);
1333 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1334 for (
const auto sector : mTPCSectors) {
1342 for (
const auto sector : mTPCSectors) {
1351 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1352 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1355 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1358 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1361 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1365 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1367 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1368 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1369 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1387 mDisplayFrontend.reset(
nullptr);
1388 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"