85#include <TStopwatch.h>
90#include <TGraphAsymmErrors.h>
102#include <unordered_set>
114GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
117 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
121 mConfParam.reset(
new GPUSettingsO2);
123 mTimer.reset(
new TStopwatch);
127 *gPolicyOrder = &mPolicyOrder;
139 mConfig->configGRP.solenoidBzNominalGPU = 0;
140 mTFSettings->hasSimStartOrbit = 1;
142 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
144 *mConfParam = mConfig->ReadConfigurableParam();
146 if (mConfParam->display) {
148 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
149 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
150 LOG(info) <<
"Event display enabled";
152 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
156 mConfig->configProcessing.doublePipeline = 1;
159 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
160 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
161 if (mAutoContinuousMaxTimeBin) {
164 if (mConfig->configProcessing.deviceNum == -2) {
167 mConfig->configProcessing.deviceNum = myId;
168 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
170 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
173 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
175 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
176 throw std::runtime_error(
"Need MC information to create QA plots");
179 mConfig->configQA.noMC =
true;
181 mConfig->configQA.shipToQC =
true;
182 if (!mConfig->configProcessing.runQA) {
183 mConfig->configQA.enableLocalOutput =
false;
185 mConfig->configProcessing.runQA = -mQATaskMask;
188 mConfig->configInterface.outputToExternalBuffers =
true;
190 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
215 if (mTPCSectorMask != 0xFFFFFFFFF) {
216 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
229 mConfig->configProcessing.outputSharedClusterMap =
true;
232 mConfig->configProcessing.createO2Output = 0;
236 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
237 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
238 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
239 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
245 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
249 mGPUReco = std::make_unique<GPUO2Interface>();
252 initFunctionTPCCalib(ic);
254 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
255 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
256 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
257 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
258 if (mConfig->configCalib.fastTransform ==
nullptr) {
259 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
262 if (mConfParam->matLUTFile.size()) {
263 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
265 if (mConfig->configCalib.matLUT ==
nullptr) {
266 LOGF(fatal,
"Error loading matlut file");
269 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
273 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
274 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
277 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
278 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
281 mConfig->configProcessing.printSettings =
true;
282 if (mConfParam->printSettings > 1) {
283 mConfig->PrintParam();
288 if (mGPUReco->Initialize(config) != 0) {
289 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
292 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
295 mGPUReco->setErrorCodeOutput(&mErrorQA);
306 if (mConfParam->dump >= 2) {
307 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
312 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
313 if (info.size == 0) {
317 mRegionInfos.emplace_back(info);
322 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
326 if (mConfParam->mutexMemReg) {
327 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
328 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
330 throw std::runtime_error(
"Error opening memlock mutex lock file");
333 if (lockf(fd, F_LOCK, 0)) {
334 throw std::runtime_error(
"Error locking memlock mutex file");
337 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
338 if (mConfParam->benchmarkMemoryRegistration) {
339 start = std::chrono::high_resolution_clock::now();
341 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
342 throw std::runtime_error(
"Error registering memory for GPU");
344 if (mConfParam->benchmarkMemoryRegistration) {
345 end = std::chrono::high_resolution_clock::now();
346 std::chrono::duration<double> elapsed_seconds =
end -
start;
347 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
349 if (mConfParam->mutexMemReg) {
350 if (lockf(fd, F_ULOCK, 0)) {
351 throw std::runtime_error(
"Error unlocking memlock mutex file");
363 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
364 handlePipelineStop();
369 handlePipelineEndOfStream(ec);
375 finaliseCCDBTPC(matcher, obj);
377 finaliseCCDBITS(matcher, obj);
381 mGRPGeomUpdated =
true;
386template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
387void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
398 tpcZSmeta.Pointers[
i][
j].clear();
399 tpcZSmeta.Sizes[
i][
j].clear();
404 tpcZSonTheFlySizes = {0};
407 bool recv =
false, recvsizes =
false;
410 throw std::runtime_error(
"Received multiple ZSSIZES data");
412 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
419 throw std::runtime_error(
"Received multiple TPCZS data");
421 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
424 if (!recv || !recvsizes) {
425 throw std::runtime_error(
"TPC ZS on the fly data not received");
430 uint32_t pageSector = 0;
431 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
432 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
433 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
435 if (mVerbosity >= 1) {
436 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
442 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
443 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
445 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
446 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
447 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
448 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
449 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
451 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
452 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
453 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
455 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
456 if (checkForZSData(
ptr, subSpec)) {
457 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
458 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
459 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
460 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
465 static uint32_t nErrors = 0;
467 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
468 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
472 int32_t totalCount = 0;
475 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
476 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
477 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
478 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
479 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
480 totalCount += tpcZSmeta.Pointers[
i][
j].size();
486 compClustersFlatDummy.setForward(&compClustersDummy);
487 pCompClustersFlat = &compClustersFlatDummy;
491 if (pCompClustersFlat ==
nullptr) {
498 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
506 if (mConfParam->dump < 2) {
507 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
510 retVal = runITSTracking(*pc);
515 mGPUReco->Clear(
false, threadIndex);
520void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
522 if (mOldCalibObjects.size() > 0) {
523 mOldCalibObjects.pop();
525 mOldCalibObjects.emplace(std::move(oldCalibObjects));
533 auto cput = mTimer->CpuTime();
534 auto realt = mTimer->RealTime();
535 mTimer->Start(
false);
538 std::vector<gsl::span<const char>>
inputs;
546 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
547 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
548 std::unique_ptr<char[]> tmpEmptyCompClusters;
550 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
551 bool debugTFDump =
false;
554 getWorkflowTPCInput_mc =
true;
557 getWorkflowTPCInput_clusters =
true;
560 getWorkflowTPCInput_digits =
true;
565 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
573 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
578 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
579 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
582 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
583 mTFSettings->hasTfStartOrbit = 1;
584 mTFSettings->hasNHBFPerTF = 1;
586 mTFSettings->hasRunStartOrbit = 0;
591 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
593 if (mConfParam->checkFirstTfOrbit) {
594 static uint32_t lastFirstTFOrbit = -1;
595 static uint32_t lastTFCounter = -1;
596 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
597 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
598 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
599 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
600 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
603 lastFirstTFOrbit = tinfo.firstTForbit;
604 lastTFCounter = tinfo.tfCounter;
617 void* ptrEp[NSectors * NEndpoints] = {};
618 bool doInputDigits =
false, doInputDigitsMC =
false;
622 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
625 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
629 throw std::runtime_error(
"Cannot process MC information, none available");
632 doInputDigits =
true;
638 if (mTPCSectorMask != 0xFFFFFFFFF) {
640 for (uint32_t
i = 0;
i < NSectors;
i++) {
641 if (!(mTPCSectorMask & (1ul <<
i))) {
657 if (doInputDigitsMC) {
660 for (uint32_t
i = 0;
i < NSectors;
i++) {
661 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
662 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
663 if (doInputDigitsMC) {
664 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
670 if (mClusterOutputIds.size() > 0) {
671 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
673 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
678 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
680 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
688 using outputDataType =
char;
690 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
692 std::unordered_set<std::string> outputsCreated;
694 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
697 if (mConfParam->allocateOutputOnTheFly) {
698 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
701 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
703 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
705 start = std::chrono::high_resolution_clock::now();
708 outputsCreated.insert(
name);
710 end = std::chrono::high_resolution_clock::now();
711 std::chrono::duration<double> elapsed_seconds =
end -
start;
712 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
713 <<
": " << elapsed_seconds.count() <<
"s";
721 outputsCreated.insert(
name);
726 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
731 throw std::runtime_error(
"Invalid buffer size requested");
735 throw std::runtime_error(
"Inconsistent buffer address after downsize");
744 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
749 if (span.size() &&
buffer.second != (
char*)span.data()) {
750 throw std::runtime_error(
"Buffer does not match span");
752 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
771 throw std::runtime_error(
"Invalid input for gpu tracking");
776 calibObjectStruct oldCalibObjects;
777 doCalibUpdates(pc, oldCalibObjects);
779 lockDecodeInput.reset();
781 uint32_t threadIndex;
782 if (mConfParam->dump) {
784 while (pipelineContext->jobThreadIndex == -1) {
786 threadIndex = pipelineContext->jobThreadIndex;
791 std::string dir =
"";
792 if (mConfParam->dumpFolder !=
"") {
793 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
795 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
800 mGPUReco->DumpSettings(threadIndex, dir.c_str());
802 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
803 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
807 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
808 if (mConfParam->dumpBadTFMode == 2) {
810 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
815 if (!pipelineContext->jobSubmitted) {
816 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
818 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
820 std::unique_lock lk(pipelineContext->jobFinishedMutex);
821 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
822 retVal = pipelineContext->jobReturnValue;
823 threadIndex = pipelineContext->jobThreadIndex;
826 threadIndex = mNextThreadIndex;
827 if (mConfig->configProcessing.doublePipeline) {
828 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
831 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
836 cleanOldCalibsTPCPtrs(oldCalibObjects);
838 o2::utils::DebugStreamer::instance()->flush();
840 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
842 if (mConfParam->dumpBadTFMode <= 1) {
844 FILE* fp = fopen(
filename.c_str(),
"w+b");
848 if (mConfParam->dumpBadTFMode == 1) {
852 fwrite(
data.data(), 1,
data.size(), fp);
855 }
else if (mConfParam->dumpBadTFMode == 2) {
856 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
860 if (mConfParam->dump == 2) {
866 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
869 bool createEmptyOutput =
false;
871 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
872 if (mConfig->configProcessing.throttleAlarms) {
873 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
875 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
877 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
879 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
883 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
884 if (createEmptyOutput) {
885 memset(&ptrs, 0,
sizeof(ptrs));
886 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
887 if (outputBuffers[
i].
first) {
894 outputBuffers[
i].first->get().resize(toSize);
895 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
897 memset(outputBuffers[
i].second, 0, toSize);
901 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
902 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
907 clustersMCBuffer.second = clustersMCBuffer.first;
908 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
914 if (!mConfParam->allocateOutputOnTheFly) {
915 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
918 throw std::runtime_error(
"Preallocated buffer size exceeded");
921 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
925 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
931 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
935 throw std::runtime_error(
"cluster native output ptrs out of sync");
948 if (mClusterOutputIds.size() > 0) {
952 for (uint32_t
i = 0;
i < NSectors;
i++) {
953 if (mTPCSectorMask & (1ul <<
i)) {
955 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
958 memset(outIndex, 0,
sizeof(*outIndex));
991 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
992 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
993 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
994 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
995 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
997 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1015 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1023 bool needCalibUpdate =
false;
1024 if (mGRPGeomUpdated) {
1025 mGRPGeomUpdated =
false;
1026 needCalibUpdate =
true;
1031 mITSGeometryCreated =
true;
1034 if (mAutoSolenoidBz) {
1040 if (mAutoContinuousMaxTimeBin) {
1043 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1046 if (!mPropagatorInstanceCreated) {
1048 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1051 mPropagatorInstanceCreated =
true;
1054 if (!mMatLUTCreated) {
1055 if (mConfParam->matLUTFile.size() == 0) {
1057 LOG(info) <<
"Loaded material budget lookup table";
1059 mMatLUTCreated =
true;
1063 gm->createPadPlaneArray();
1064 gm->createClusterMatrixArray();
1065 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1066 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1067 LOG(info) <<
"Loaded TRD geometry";
1068 mTRDGeometryCreated =
true;
1071 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1073 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1075 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1077 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1078 needCalibUpdate =
true;
1082 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1083 if (!out.is_open()) {
1084 throw std::runtime_error(
"Failed to open output file: " +
path);
1087 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1089 throw std::runtime_error(
"Failed while writing data to: " +
path);
1092 for (
int i = 0;
i < 3;
i++) {
1097 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1101 if (needCalibUpdate) {
1102 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1103 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1112 char* o2jobid = getenv(
"O2JOBID");
1113 char* numaid = getenv(
"NUMAID");
1114 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1115 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1135 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1140 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1144 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1154 if (mapSources != 0) {
1175 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1213 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1214 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1215 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1217 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1219 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1222 if (mSpecConfig.
isITS3) {
1223 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1224 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1226 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1227 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1230 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1234 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1235 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1240 *mConfParam = mConfig->ReadConfigurableParam();
1241 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1243 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1245 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1246 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1248 std::map<std::string, std::string> metadata;
1249 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1250 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1251 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1252 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1253 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1254 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1256 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1257 for (
const auto& [
key,
value] : inputMap) {
1259 outputMetadata.push_back({
key,
value});
1265 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1267 if (mConfParam->printSettings) {
1268 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1269 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1270 for (
const auto& [
key,
value] : settings) {
1274 printSettings(metadata);
1278 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1279 convert_map_to_metadata(metadata, ccdb_metadata);
1280 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1281 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1282 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1283 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1284 convert_map_to_metadata(metadata, ccdb_metadata);
1285 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1288 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1289 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1290 convert_map_to_metadata(metadata, ccdb_metadata);
1291 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1294 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1295 convert_map_to_metadata(metadata, ccdb_metadata);
1296 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1306 std::vector<OutputSpec> outputSpecs;
1308 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1319 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1322 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1325 for (
auto const& sector : mTPCSectors) {
1326 mClusterOutputIds.emplace_back(sector);
1329 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1330 for (
const auto sector : mTPCSectors) {
1338 for (
const auto sector : mTPCSectors) {
1347 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1348 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1351 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1354 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1357 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1361 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1362 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1363 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1364 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1365 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1369 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1371 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1372 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1383 mDisplayFrontend.reset(
nullptr);
1384 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"