86#include <TStopwatch.h>
91#include <TGraphAsymmErrors.h>
103#include <unordered_set>
115GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
118 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
122 mConfParam.reset(
new GPUSettingsO2);
124 mTimer.reset(
new TStopwatch);
128 *gPolicyOrder = &mPolicyOrder;
140 mConfig->configGRP.solenoidBzNominalGPU = 0;
141 mTFSettings->hasSimStartOrbit = 1;
143 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
145 *mConfParam = mConfig->ReadConfigurableParam();
147 if (mConfParam->display) {
149 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
150 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
151 LOG(info) <<
"Event display enabled";
153 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
157 mConfig->configProcessing.doublePipeline = 1;
160 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
161 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
162 if (mAutoContinuousMaxTimeBin) {
165 if (mConfig->configProcessing.deviceNum == -2) {
168 mConfig->configProcessing.deviceNum = myId;
169 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
171 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
174 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
176 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
177 throw std::runtime_error(
"Need MC information to create QA plots");
180 mConfig->configQA.noMC =
true;
182 mConfig->configQA.shipToQC =
true;
183 if (!mConfig->configProcessing.runQA) {
184 mConfig->configQA.enableLocalOutput =
false;
186 mConfig->configProcessing.runQA = -mQATaskMask;
189 mConfig->configInterface.outputToExternalBuffers =
true;
199 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
217 if (mTPCSectorMask != 0xFFFFFFFFF) {
218 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
231 mConfig->configProcessing.outputSharedClusterMap =
true;
234 mConfig->configProcessing.createO2Output = 0;
238 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
239 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
240 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
241 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
247 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
251 mGPUReco = std::make_unique<GPUO2Interface>();
254 initFunctionTPCCalib(ic);
256 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
257 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
258 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
259 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
260 if (mConfig->configCalib.fastTransform ==
nullptr) {
261 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
264 if (mConfParam->matLUTFile.size()) {
265 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
267 if (mConfig->configCalib.matLUT ==
nullptr) {
268 LOGF(fatal,
"Error loading matlut file");
271 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
275 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
276 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
278 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
279 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
282 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
283 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
286 mConfig->configProcessing.printSettings =
true;
287 if (mConfParam->printSettings > 1) {
288 mConfig->PrintParam();
293 if (mGPUReco->Initialize(config) != 0) {
294 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
297 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
300 mGPUReco->setErrorCodeOutput(&mErrorQA);
311 if (mConfParam->dump >= 2) {
312 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
317 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
318 if (info.size == 0) {
322 mRegionInfos.emplace_back(info);
327 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
331 if (mConfParam->mutexMemReg) {
332 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
333 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
335 throw std::runtime_error(
"Error opening memlock mutex lock file");
338 if (lockf(fd, F_LOCK, 0)) {
339 throw std::runtime_error(
"Error locking memlock mutex file");
342 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
343 if (mConfParam->benchmarkMemoryRegistration) {
344 start = std::chrono::high_resolution_clock::now();
346 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
347 throw std::runtime_error(
"Error registering memory for GPU");
349 if (mConfParam->benchmarkMemoryRegistration) {
350 end = std::chrono::high_resolution_clock::now();
351 std::chrono::duration<double> elapsed_seconds =
end -
start;
352 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
354 if (mConfParam->mutexMemReg) {
355 if (lockf(fd, F_ULOCK, 0)) {
356 throw std::runtime_error(
"Error unlocking memlock mutex file");
368 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
369 handlePipelineStop();
374 handlePipelineEndOfStream(ec);
380 finaliseCCDBTPC(matcher, obj);
382 finaliseCCDBITS(matcher, obj);
386 mGRPGeomUpdated =
true;
391template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
392void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
403 tpcZSmeta.Pointers[
i][
j].clear();
404 tpcZSmeta.Sizes[
i][
j].clear();
409 tpcZSonTheFlySizes = {0};
412 bool recv =
false, recvsizes =
false;
415 throw std::runtime_error(
"Received multiple ZSSIZES data");
417 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
424 throw std::runtime_error(
"Received multiple TPCZS data");
426 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
429 if (!recv || !recvsizes) {
430 throw std::runtime_error(
"TPC ZS on the fly data not received");
435 uint32_t pageSector = 0;
436 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
437 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
438 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
440 if (mVerbosity >= 1) {
441 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
447 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
448 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
450 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
451 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
452 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
453 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
454 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
456 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
457 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
458 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
460 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
461 if (checkForZSData(
ptr, subSpec)) {
462 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
463 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
464 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
465 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
470 static uint32_t nErrors = 0;
472 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
473 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
477 int32_t totalCount = 0;
480 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
481 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
482 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
483 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
484 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
485 totalCount += tpcZSmeta.Pointers[
i][
j].size();
491 compClustersFlatDummy.setForward(&compClustersDummy);
492 pCompClustersFlat = &compClustersFlatDummy;
496 if (pCompClustersFlat ==
nullptr) {
503 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
511 if (mConfParam->dump < 2) {
512 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
515 retVal = runITSTracking(*pc);
520 mGPUReco->Clear(
false, threadIndex);
525void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
527 if (mOldCalibObjects.size() > 0) {
528 mOldCalibObjects.pop();
530 mOldCalibObjects.emplace(std::move(oldCalibObjects));
538 auto cput = mTimer->CpuTime();
539 auto realt = mTimer->RealTime();
540 mTimer->Start(
false);
543 std::vector<gsl::span<const char>>
inputs;
551 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
552 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
553 std::unique_ptr<char[]> tmpEmptyCompClusters;
555 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
556 bool debugTFDump =
false;
559 getWorkflowTPCInput_mc =
true;
562 getWorkflowTPCInput_clusters =
true;
565 getWorkflowTPCInput_digits =
true;
570 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
578 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
583 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
584 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
587 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
588 mTFSettings->hasTfStartOrbit = 1;
589 mTFSettings->hasNHBFPerTF = 1;
591 mTFSettings->hasRunStartOrbit = 0;
596 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
598 if (mConfParam->checkFirstTfOrbit) {
599 static uint32_t lastFirstTFOrbit = -1;
600 static uint32_t lastTFCounter = -1;
601 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
602 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
603 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
604 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
605 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
608 lastFirstTFOrbit = tinfo.firstTForbit;
609 lastTFCounter = tinfo.tfCounter;
622 void* ptrEp[NSectors * NEndpoints] = {};
623 bool doInputDigits =
false, doInputDigitsMC =
false;
627 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
630 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
634 throw std::runtime_error(
"Cannot process MC information, none available");
637 doInputDigits =
true;
643 if (mTPCSectorMask != 0xFFFFFFFFF) {
645 for (uint32_t
i = 0;
i < NSectors;
i++) {
646 if (!(mTPCSectorMask & (1ul <<
i))) {
662 if (doInputDigitsMC) {
665 for (uint32_t
i = 0;
i < NSectors;
i++) {
666 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
667 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
668 if (doInputDigitsMC) {
669 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
675 if (mClusterOutputIds.size() > 0) {
676 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
678 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
683 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
685 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
693 using outputDataType =
char;
695 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
697 std::unordered_set<std::string> outputsCreated;
699 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
702 if (mConfParam->allocateOutputOnTheFly) {
703 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
706 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
708 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
710 start = std::chrono::high_resolution_clock::now();
713 outputsCreated.insert(
name);
715 end = std::chrono::high_resolution_clock::now();
716 std::chrono::duration<double> elapsed_seconds =
end -
start;
717 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
718 <<
": " << elapsed_seconds.count() <<
"s";
726 outputsCreated.insert(
name);
731 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
736 throw std::runtime_error(
"Invalid buffer size requested");
740 throw std::runtime_error(
"Inconsistent buffer address after downsize");
749 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
754 if (span.size() &&
buffer.second != (
char*)span.data()) {
755 throw std::runtime_error(
"Buffer does not match span");
757 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
776 throw std::runtime_error(
"Invalid input for gpu tracking");
781 calibObjectStruct oldCalibObjects;
782 doCalibUpdates(pc, oldCalibObjects);
784 lockDecodeInput.reset();
786 uint32_t threadIndex;
787 if (mConfParam->dump) {
789 while (pipelineContext->jobThreadIndex == -1) {
791 threadIndex = pipelineContext->jobThreadIndex;
796 std::string dir =
"";
797 if (mConfParam->dumpFolder !=
"") {
798 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
800 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
805 mGPUReco->DumpSettings(threadIndex, dir.c_str());
807 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
808 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
812 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
813 if (mConfParam->dumpBadTFMode == 2) {
815 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
820 if (!pipelineContext->jobSubmitted) {
821 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
823 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
825 std::unique_lock lk(pipelineContext->jobFinishedMutex);
826 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
827 retVal = pipelineContext->jobReturnValue;
828 threadIndex = pipelineContext->jobThreadIndex;
831 threadIndex = mNextThreadIndex;
832 if (mConfig->configProcessing.doublePipeline) {
833 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
836 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
841 cleanOldCalibsTPCPtrs(oldCalibObjects);
843 o2::utils::DebugStreamer::instance()->flush();
845 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
847 if (mConfParam->dumpBadTFMode <= 1) {
849 FILE* fp = fopen(
filename.c_str(),
"w+b");
853 if (mConfParam->dumpBadTFMode == 1) {
857 fwrite(
data.data(), 1,
data.size(), fp);
860 }
else if (mConfParam->dumpBadTFMode == 2) {
861 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
865 if (mConfParam->dump == 2) {
871 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
874 bool createEmptyOutput =
false;
876 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
877 if (mConfig->configProcessing.throttleAlarms) {
878 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
880 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
882 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
884 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
888 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
889 if (createEmptyOutput) {
890 memset(&ptrs, 0,
sizeof(ptrs));
891 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
892 if (outputBuffers[
i].
first) {
899 outputBuffers[
i].first->get().resize(toSize);
900 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
902 memset(outputBuffers[
i].second, 0, toSize);
906 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
907 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
912 clustersMCBuffer.second = clustersMCBuffer.first;
913 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
919 if (!mConfParam->allocateOutputOnTheFly) {
920 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
923 throw std::runtime_error(
"Preallocated buffer size exceeded");
926 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
930 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
936 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
940 throw std::runtime_error(
"cluster native output ptrs out of sync");
953 if (mClusterOutputIds.size() > 0) {
957 for (uint32_t
i = 0;
i < NSectors;
i++) {
958 if (mTPCSectorMask & (1ul <<
i)) {
960 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
963 memset(outIndex, 0,
sizeof(*outIndex));
996 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
997 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
998 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
999 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
1000 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1002 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1020 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1028 bool needCalibUpdate =
false;
1029 if (mGRPGeomUpdated) {
1030 mGRPGeomUpdated =
false;
1031 needCalibUpdate =
true;
1036 mITSGeometryCreated =
true;
1039 if (mAutoSolenoidBz) {
1045 if (mAutoContinuousMaxTimeBin) {
1048 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1051 if (!mPropagatorInstanceCreated) {
1053 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1056 mPropagatorInstanceCreated =
true;
1059 if (!mMatLUTCreated) {
1060 if (mConfParam->matLUTFile.size() == 0) {
1062 LOG(info) <<
"Loaded material budget lookup table";
1064 mMatLUTCreated =
true;
1067 if (!mTRDGeometryCreated) {
1069 gm->createPadPlaneArray();
1070 gm->createClusterMatrixArray();
1071 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1072 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1073 LOG(info) <<
"Loaded TRD geometry";
1074 mTRDGeometryCreated =
true;
1076 if (!mTRDRecoParamCreated) {
1077 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1078 newCalibObjects.
trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1079 mTRDRecoParamCreated =
true;
1083 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1085 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1087 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1089 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1090 needCalibUpdate =
true;
1094 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1095 if (!out.is_open()) {
1096 throw std::runtime_error(
"Failed to open output file: " +
path);
1099 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1101 throw std::runtime_error(
"Failed while writing data to: " +
path);
1104 for (
int i = 0;
i < 3;
i++) {
1109 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1113 if (needCalibUpdate) {
1114 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1115 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1124 char* o2jobid = getenv(
"O2JOBID");
1125 char* numaid = getenv(
"NUMAID");
1126 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1127 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1147 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1152 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1156 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1166 if (mapSources != 0) {
1187 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1225 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1226 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1227 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1229 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1231 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1234 if (mSpecConfig.
isITS3) {
1235 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1236 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1238 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1239 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1242 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1246 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1247 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1252 *mConfParam = mConfig->ReadConfigurableParam();
1253 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1255 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1257 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1258 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1260 std::map<std::string, std::string> metadata;
1261 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1262 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1263 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1264 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1265 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1266 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1268 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1269 for (
const auto& [
key,
value] : inputMap) {
1271 outputMetadata.push_back({
key,
value});
1277 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1279 if (mConfParam->printSettings) {
1280 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1281 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1282 for (
const auto& [
key,
value] : settings) {
1286 printSettings(metadata);
1290 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1291 convert_map_to_metadata(metadata, ccdb_metadata);
1292 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1293 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1294 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1295 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1296 convert_map_to_metadata(metadata, ccdb_metadata);
1297 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1300 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1301 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 convert_map_to_metadata(metadata, ccdb_metadata);
1303 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1306 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1307 convert_map_to_metadata(metadata, ccdb_metadata);
1308 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1318 std::vector<OutputSpec> outputSpecs;
1320 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1331 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1334 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1337 for (
auto const& sector : mTPCSectors) {
1338 mClusterOutputIds.emplace_back(sector);
1341 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1342 for (
const auto sector : mTPCSectors) {
1350 for (
const auto sector : mTPCSectors) {
1359 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1360 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1363 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1369 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1377 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1378 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1381 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1384 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1395 mDisplayFrontend.reset(
nullptr);
1396 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"