85#include <TStopwatch.h>
90#include <TGraphAsymmErrors.h>
102#include <unordered_set>
114GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
117 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
121 mConfParam.reset(
new GPUSettingsO2);
123 mTimer.reset(
new TStopwatch);
127 *gPolicyOrder = &mPolicyOrder;
139 mConfig->configGRP.solenoidBzNominalGPU = 0;
140 mTFSettings->hasSimStartOrbit = 1;
142 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
144 *mConfParam = mConfig->ReadConfigurableParam();
146 if (mConfParam->display) {
148 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
149 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
150 LOG(info) <<
"Event display enabled";
152 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
156 mConfig->configProcessing.doublePipeline = 1;
159 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
160 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
161 if (mAutoContinuousMaxTimeBin) {
164 if (mConfig->configProcessing.deviceNum == -2) {
167 mConfig->configProcessing.deviceNum = myId;
168 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
170 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
173 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
175 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
176 throw std::runtime_error(
"Need MC information to create QA plots");
179 mConfig->configQA.noMC =
true;
181 mConfig->configQA.shipToQC =
true;
182 if (!mConfig->configProcessing.runQA) {
183 mConfig->configQA.enableLocalOutput =
false;
185 mConfig->configProcessing.runQA = -mQATaskMask;
188 mConfig->configInterface.outputToExternalBuffers =
true;
198 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
216 if (mTPCSectorMask != 0xFFFFFFFFF) {
217 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
230 mConfig->configProcessing.outputSharedClusterMap =
true;
233 mConfig->configProcessing.createO2Output = 0;
237 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
238 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
239 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
240 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
246 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
250 mGPUReco = std::make_unique<GPUO2Interface>();
253 initFunctionTPCCalib(ic);
255 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
256 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
257 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
258 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
259 if (mConfig->configCalib.fastTransform ==
nullptr) {
260 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
263 if (mConfParam->matLUTFile.size()) {
264 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
266 if (mConfig->configCalib.matLUT ==
nullptr) {
267 LOGF(fatal,
"Error loading matlut file");
270 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
274 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
275 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
278 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
279 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
282 mConfig->configProcessing.printSettings =
true;
283 if (mConfParam->printSettings > 1) {
284 mConfig->PrintParam();
289 if (mGPUReco->Initialize(config) != 0) {
290 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
293 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
296 mGPUReco->setErrorCodeOutput(&mErrorQA);
307 if (mConfParam->dump >= 2) {
308 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
313 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
314 if (info.size == 0) {
318 mRegionInfos.emplace_back(info);
323 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
327 if (mConfParam->mutexMemReg) {
328 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
329 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
331 throw std::runtime_error(
"Error opening memlock mutex lock file");
334 if (lockf(fd, F_LOCK, 0)) {
335 throw std::runtime_error(
"Error locking memlock mutex file");
338 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
339 if (mConfParam->benchmarkMemoryRegistration) {
340 start = std::chrono::high_resolution_clock::now();
342 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
343 throw std::runtime_error(
"Error registering memory for GPU");
345 if (mConfParam->benchmarkMemoryRegistration) {
346 end = std::chrono::high_resolution_clock::now();
347 std::chrono::duration<double> elapsed_seconds =
end -
start;
348 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
350 if (mConfParam->mutexMemReg) {
351 if (lockf(fd, F_ULOCK, 0)) {
352 throw std::runtime_error(
"Error unlocking memlock mutex file");
364 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
365 handlePipelineStop();
370 handlePipelineEndOfStream(ec);
376 finaliseCCDBTPC(matcher, obj);
378 finaliseCCDBITS(matcher, obj);
382 mGRPGeomUpdated =
true;
387template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
388void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
399 tpcZSmeta.Pointers[
i][
j].clear();
400 tpcZSmeta.Sizes[
i][
j].clear();
405 tpcZSonTheFlySizes = {0};
408 bool recv =
false, recvsizes =
false;
411 throw std::runtime_error(
"Received multiple ZSSIZES data");
413 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
420 throw std::runtime_error(
"Received multiple TPCZS data");
422 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
425 if (!recv || !recvsizes) {
426 throw std::runtime_error(
"TPC ZS on the fly data not received");
431 uint32_t pageSector = 0;
432 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
433 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
434 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
436 if (mVerbosity >= 1) {
437 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
443 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
444 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
446 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
447 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
448 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
449 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
450 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
452 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
453 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
454 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
456 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
457 if (checkForZSData(
ptr, subSpec)) {
458 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
459 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
460 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
461 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
466 static uint32_t nErrors = 0;
468 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
469 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
473 int32_t totalCount = 0;
476 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
477 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
478 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
479 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
480 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
481 totalCount += tpcZSmeta.Pointers[
i][
j].size();
487 compClustersFlatDummy.setForward(&compClustersDummy);
488 pCompClustersFlat = &compClustersFlatDummy;
492 if (pCompClustersFlat ==
nullptr) {
499 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
507 if (mConfParam->dump < 2) {
508 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
511 retVal = runITSTracking(*pc);
516 mGPUReco->Clear(
false, threadIndex);
521void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
523 if (mOldCalibObjects.size() > 0) {
524 mOldCalibObjects.pop();
526 mOldCalibObjects.emplace(std::move(oldCalibObjects));
534 auto cput = mTimer->CpuTime();
535 auto realt = mTimer->RealTime();
536 mTimer->Start(
false);
539 std::vector<gsl::span<const char>>
inputs;
547 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
548 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
549 std::unique_ptr<char[]> tmpEmptyCompClusters;
551 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
552 bool debugTFDump =
false;
555 getWorkflowTPCInput_mc =
true;
558 getWorkflowTPCInput_clusters =
true;
561 getWorkflowTPCInput_digits =
true;
566 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
574 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
579 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
580 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
583 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
584 mTFSettings->hasTfStartOrbit = 1;
585 mTFSettings->hasNHBFPerTF = 1;
587 mTFSettings->hasRunStartOrbit = 0;
592 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
594 if (mConfParam->checkFirstTfOrbit) {
595 static uint32_t lastFirstTFOrbit = -1;
596 static uint32_t lastTFCounter = -1;
597 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
598 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
599 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
600 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
601 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
604 lastFirstTFOrbit = tinfo.firstTForbit;
605 lastTFCounter = tinfo.tfCounter;
618 void* ptrEp[NSectors * NEndpoints] = {};
619 bool doInputDigits =
false, doInputDigitsMC =
false;
623 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
626 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
630 throw std::runtime_error(
"Cannot process MC information, none available");
633 doInputDigits =
true;
639 if (mTPCSectorMask != 0xFFFFFFFFF) {
641 for (uint32_t
i = 0;
i < NSectors;
i++) {
642 if (!(mTPCSectorMask & (1ul <<
i))) {
658 if (doInputDigitsMC) {
661 for (uint32_t
i = 0;
i < NSectors;
i++) {
662 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
663 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
664 if (doInputDigitsMC) {
665 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
671 if (mClusterOutputIds.size() > 0) {
672 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
674 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
679 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
681 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
689 using outputDataType =
char;
691 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
693 std::unordered_set<std::string> outputsCreated;
695 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
698 if (mConfParam->allocateOutputOnTheFly) {
699 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
702 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
704 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
706 start = std::chrono::high_resolution_clock::now();
709 outputsCreated.insert(
name);
711 end = std::chrono::high_resolution_clock::now();
712 std::chrono::duration<double> elapsed_seconds =
end -
start;
713 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
714 <<
": " << elapsed_seconds.count() <<
"s";
722 outputsCreated.insert(
name);
727 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
732 throw std::runtime_error(
"Invalid buffer size requested");
736 throw std::runtime_error(
"Inconsistent buffer address after downsize");
745 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
750 if (span.size() &&
buffer.second != (
char*)span.data()) {
751 throw std::runtime_error(
"Buffer does not match span");
753 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
772 throw std::runtime_error(
"Invalid input for gpu tracking");
777 calibObjectStruct oldCalibObjects;
778 doCalibUpdates(pc, oldCalibObjects);
780 lockDecodeInput.reset();
782 uint32_t threadIndex;
783 if (mConfParam->dump) {
785 while (pipelineContext->jobThreadIndex == -1) {
787 threadIndex = pipelineContext->jobThreadIndex;
792 std::string dir =
"";
793 if (mConfParam->dumpFolder !=
"") {
794 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
796 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
801 mGPUReco->DumpSettings(threadIndex, dir.c_str());
803 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
804 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
808 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
809 if (mConfParam->dumpBadTFMode == 2) {
811 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
816 if (!pipelineContext->jobSubmitted) {
817 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
819 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
821 std::unique_lock lk(pipelineContext->jobFinishedMutex);
822 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
823 retVal = pipelineContext->jobReturnValue;
824 threadIndex = pipelineContext->jobThreadIndex;
827 threadIndex = mNextThreadIndex;
828 if (mConfig->configProcessing.doublePipeline) {
829 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
832 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
837 cleanOldCalibsTPCPtrs(oldCalibObjects);
839 o2::utils::DebugStreamer::instance()->flush();
841 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
843 if (mConfParam->dumpBadTFMode <= 1) {
845 FILE* fp = fopen(
filename.c_str(),
"w+b");
849 if (mConfParam->dumpBadTFMode == 1) {
853 fwrite(
data.data(), 1,
data.size(), fp);
856 }
else if (mConfParam->dumpBadTFMode == 2) {
857 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
861 if (mConfParam->dump == 2) {
867 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
870 bool createEmptyOutput =
false;
872 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
873 if (mConfig->configProcessing.throttleAlarms) {
874 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
876 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
878 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
880 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
884 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
885 if (createEmptyOutput) {
886 memset(&ptrs, 0,
sizeof(ptrs));
887 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
888 if (outputBuffers[
i].
first) {
895 outputBuffers[
i].first->get().resize(toSize);
896 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
898 memset(outputBuffers[
i].second, 0, toSize);
902 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
903 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
908 clustersMCBuffer.second = clustersMCBuffer.first;
909 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
915 if (!mConfParam->allocateOutputOnTheFly) {
916 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
919 throw std::runtime_error(
"Preallocated buffer size exceeded");
922 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
926 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
932 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
936 throw std::runtime_error(
"cluster native output ptrs out of sync");
949 if (mClusterOutputIds.size() > 0) {
953 for (uint32_t
i = 0;
i < NSectors;
i++) {
954 if (mTPCSectorMask & (1ul <<
i)) {
956 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
959 memset(outIndex, 0,
sizeof(*outIndex));
992 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
993 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
994 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
995 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
996 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
998 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1016 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1024 bool needCalibUpdate =
false;
1025 if (mGRPGeomUpdated) {
1026 mGRPGeomUpdated =
false;
1027 needCalibUpdate =
true;
1032 mITSGeometryCreated =
true;
1035 if (mAutoSolenoidBz) {
1041 if (mAutoContinuousMaxTimeBin) {
1044 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1047 if (!mPropagatorInstanceCreated) {
1049 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1052 mPropagatorInstanceCreated =
true;
1055 if (!mMatLUTCreated) {
1056 if (mConfParam->matLUTFile.size() == 0) {
1058 LOG(info) <<
"Loaded material budget lookup table";
1060 mMatLUTCreated =
true;
1064 gm->createPadPlaneArray();
1065 gm->createClusterMatrixArray();
1066 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1067 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1068 LOG(info) <<
"Loaded TRD geometry";
1069 mTRDGeometryCreated =
true;
1072 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1074 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1076 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1078 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1079 needCalibUpdate =
true;
1083 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1084 if (!out.is_open()) {
1085 throw std::runtime_error(
"Failed to open output file: " +
path);
1088 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1090 throw std::runtime_error(
"Failed while writing data to: " +
path);
1093 for (
int i = 0;
i < 3;
i++) {
1098 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1102 if (needCalibUpdate) {
1103 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1104 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1113 char* o2jobid = getenv(
"O2JOBID");
1114 char* numaid = getenv(
"NUMAID");
1115 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1116 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1136 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1141 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1145 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1155 if (mapSources != 0) {
1176 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1214 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1215 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1216 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1218 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1220 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1223 if (mSpecConfig.
isITS3) {
1224 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1225 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1227 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1228 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1231 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1235 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1236 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1241 *mConfParam = mConfig->ReadConfigurableParam();
1242 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1244 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1246 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1247 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1249 std::map<std::string, std::string> metadata;
1250 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1251 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1252 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1253 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1254 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1255 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1257 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1258 for (
const auto& [
key,
value] : inputMap) {
1260 outputMetadata.push_back({
key,
value});
1266 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1268 if (mConfParam->printSettings) {
1269 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1270 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1271 for (
const auto& [
key,
value] : settings) {
1275 printSettings(metadata);
1279 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1280 convert_map_to_metadata(metadata, ccdb_metadata);
1281 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1282 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1283 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1284 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1285 convert_map_to_metadata(metadata, ccdb_metadata);
1286 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1289 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1290 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1291 convert_map_to_metadata(metadata, ccdb_metadata);
1292 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1295 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1296 convert_map_to_metadata(metadata, ccdb_metadata);
1297 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1307 std::vector<OutputSpec> outputSpecs;
1309 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1320 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1323 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1326 for (
auto const& sector : mTPCSectors) {
1327 mClusterOutputIds.emplace_back(sector);
1330 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1331 for (
const auto sector : mTPCSectors) {
1339 for (
const auto sector : mTPCSectors) {
1348 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1349 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1352 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1355 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1358 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1362 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1363 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1364 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1365 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1367 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1371 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1372 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1384 mDisplayFrontend.reset(
nullptr);
1385 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"