86#include <TStopwatch.h>
91#include <TGraphAsymmErrors.h>
103#include <unordered_set>
115GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
118 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
122 mConfParam.reset(
new GPUSettingsO2);
124 mTimer.reset(
new TStopwatch);
128 *gPolicyOrder = &mPolicyOrder;
140 mConfig->configGRP.solenoidBzNominalGPU = 0;
141 mTFSettings->hasSimStartOrbit = 1;
143 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
145 *mConfParam = mConfig->ReadConfigurableParam();
147 if (mConfParam->display) {
149 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
150 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
151 LOG(info) <<
"Event display enabled";
153 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
157 mConfig->configProcessing.doublePipeline = 1;
160 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
161 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
162 if (mAutoContinuousMaxTimeBin) {
165 if (mConfig->configProcessing.deviceNum == -2) {
168 mConfig->configProcessing.deviceNum = myId;
169 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
171 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
174 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
176 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
177 throw std::runtime_error(
"Need MC information to create QA plots");
180 mConfig->configQA.noMC =
true;
182 mConfig->configQA.shipToQC =
true;
183 if (!mConfig->configProcessing.runQA) {
184 mConfig->configQA.enableLocalOutput =
false;
186 mConfig->configProcessing.runQA = -mQATaskMask;
189 mConfig->configInterface.outputToExternalBuffers =
true;
199 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
217 if (mTPCSectorMask != 0xFFFFFFFFF) {
218 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
231 mConfig->configProcessing.outputSharedClusterMap =
true;
234 mConfig->configProcessing.createO2Output = 0;
238 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
239 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
240 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
241 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
247 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
251 mGPUReco = std::make_unique<GPUO2Interface>();
254 initFunctionTPCCalib(ic);
256 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
257 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
258 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
259 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
260 if (mConfig->configCalib.fastTransform ==
nullptr) {
261 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
264 if (mConfParam->matLUTFile.size()) {
265 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
267 if (mConfig->configCalib.matLUT ==
nullptr) {
268 LOGF(fatal,
"Error loading matlut file");
271 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
275 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
276 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
278 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
279 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
282 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
283 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
286 mConfig->configProcessing.printSettings =
true;
287 if (mConfParam->printSettings > 1) {
288 mConfig->PrintParam();
293 if (mGPUReco->Initialize(config) != 0) {
294 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
297 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
300 mGPUReco->setErrorCodeOutput(&mErrorQA);
311 if (mConfParam->dump >= 2) {
312 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
317 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
318 if (info.size == 0) {
322 mRegionInfos.emplace_back(info);
327 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
331 if (mConfParam->mutexMemReg) {
332 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
333 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
335 throw std::runtime_error(
"Error opening memlock mutex lock file");
338 if (lockf(fd, F_LOCK, 0)) {
339 throw std::runtime_error(
"Error locking memlock mutex file");
342 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
343 if (mConfParam->benchmarkMemoryRegistration) {
344 start = std::chrono::high_resolution_clock::now();
346 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
347 throw std::runtime_error(
"Error registering memory for GPU");
349 if (mConfParam->benchmarkMemoryRegistration) {
350 end = std::chrono::high_resolution_clock::now();
351 std::chrono::duration<double> elapsed_seconds =
end -
start;
352 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
354 if (mConfParam->mutexMemReg) {
355 if (lockf(fd, F_ULOCK, 0)) {
356 throw std::runtime_error(
"Error unlocking memlock mutex file");
368 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
369 handlePipelineStop();
374 handlePipelineEndOfStream(ec);
380 finaliseCCDBTPC(matcher, obj);
382 finaliseCCDBITS(matcher, obj);
386 mGRPGeomUpdated =
true;
391template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
392void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
403 tpcZSmeta.Pointers[
i][
j].clear();
404 tpcZSmeta.Sizes[
i][
j].clear();
409 tpcZSonTheFlySizes = {0};
412 bool recv =
false, recvsizes =
false;
415 throw std::runtime_error(
"Received multiple ZSSIZES data");
417 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
424 throw std::runtime_error(
"Received multiple TPCZS data");
426 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
429 if (!recv || !recvsizes) {
430 throw std::runtime_error(
"TPC ZS on the fly data not received");
435 uint32_t pageSector = 0;
436 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
437 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
438 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
440 if (mVerbosity >= 1) {
441 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
447 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
448 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
450 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
451 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
452 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
453 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
454 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
456 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
457 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
458 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
460 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
461 if (checkForZSData(
ptr, subSpec)) {
462 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
463 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
464 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
465 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
470 static uint32_t nErrors = 0;
472 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
473 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
477 int32_t totalCount = 0;
480 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
481 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
482 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
483 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
484 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
485 totalCount += tpcZSmeta.Pointers[
i][
j].size();
491 compClustersFlatDummy.setForward(&compClustersDummy);
492 pCompClustersFlat = &compClustersFlatDummy;
496 if (pCompClustersFlat ==
nullptr) {
503 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
511 if (mConfParam->dump < 2) {
512 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
515 retVal = runITSTracking(*pc);
517 static bool first =
true;
527 mGPUReco->Clear(
false, threadIndex);
532void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
534 if (mOldCalibObjects.size() > 0) {
535 mOldCalibObjects.pop();
537 mOldCalibObjects.emplace(std::move(oldCalibObjects));
545 auto cput = mTimer->CpuTime();
546 auto realt = mTimer->RealTime();
547 mTimer->Start(
false);
550 std::vector<gsl::span<const char>>
inputs;
558 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
559 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
560 std::unique_ptr<char[]> tmpEmptyCompClusters;
562 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
563 bool debugTFDump =
false;
566 getWorkflowTPCInput_mc =
true;
569 getWorkflowTPCInput_clusters =
true;
572 getWorkflowTPCInput_digits =
true;
577 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
585 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
590 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
591 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
594 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
595 mTFSettings->hasTfStartOrbit = 1;
596 mTFSettings->hasNHBFPerTF = 1;
598 mTFSettings->hasRunStartOrbit = 0;
603 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
605 if (mConfParam->checkFirstTfOrbit) {
606 static uint32_t lastFirstTFOrbit = -1;
607 static uint32_t lastTFCounter = -1;
608 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
609 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
610 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
611 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
612 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
615 lastFirstTFOrbit = tinfo.firstTForbit;
616 lastTFCounter = tinfo.tfCounter;
629 void* ptrEp[NSectors * NEndpoints] = {};
630 bool doInputDigits =
false, doInputDigitsMC =
false;
634 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
637 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
641 throw std::runtime_error(
"Cannot process MC information, none available");
644 doInputDigits =
true;
650 if (mTPCSectorMask != 0xFFFFFFFFF) {
652 for (uint32_t
i = 0;
i < NSectors;
i++) {
653 if (!(mTPCSectorMask & (1ul <<
i))) {
669 if (doInputDigitsMC) {
672 for (uint32_t
i = 0;
i < NSectors;
i++) {
673 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
674 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
675 if (doInputDigitsMC) {
676 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
682 if (mClusterOutputIds.size() > 0) {
683 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
685 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
690 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
692 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
700 using outputDataType =
char;
702 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
704 std::unordered_set<std::string> outputsCreated;
706 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
709 if (mConfParam->allocateOutputOnTheFly) {
710 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
713 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
715 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
717 start = std::chrono::high_resolution_clock::now();
720 outputsCreated.insert(
name);
722 end = std::chrono::high_resolution_clock::now();
723 std::chrono::duration<double> elapsed_seconds =
end -
start;
724 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
725 <<
": " << elapsed_seconds.count() <<
"s";
733 outputsCreated.insert(
name);
738 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
743 throw std::runtime_error(
"Invalid buffer size requested");
747 throw std::runtime_error(
"Inconsistent buffer address after downsize");
756 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
761 if (span.size() &&
buffer.second != (
char*)span.data()) {
762 throw std::runtime_error(
"Buffer does not match span");
764 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
783 throw std::runtime_error(
"Invalid input for gpu tracking");
788 calibObjectStruct oldCalibObjects;
789 doCalibUpdates(pc, oldCalibObjects);
791 lockDecodeInput.reset();
793 uint32_t threadIndex;
794 if (mConfParam->dump) {
796 while (pipelineContext->jobThreadIndex == -1) {
798 threadIndex = pipelineContext->jobThreadIndex;
803 std::string dir =
"";
804 if (mConfParam->dumpFolder !=
"") {
805 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
807 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
812 mGPUReco->DumpSettings(threadIndex, dir.c_str());
814 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
815 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
819 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
820 if (mConfParam->dumpBadTFMode == 2) {
822 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
827 if (!pipelineContext->jobSubmitted) {
828 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
830 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
832 std::unique_lock lk(pipelineContext->jobFinishedMutex);
833 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
834 retVal = pipelineContext->jobReturnValue;
835 threadIndex = pipelineContext->jobThreadIndex;
838 threadIndex = mNextThreadIndex;
839 if (mConfig->configProcessing.doublePipeline) {
840 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
843 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
848 cleanOldCalibsTPCPtrs(oldCalibObjects);
850 o2::utils::DebugStreamer::instance()->flush();
852 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
854 if (mConfParam->dumpBadTFMode <= 1) {
856 FILE* fp = fopen(
filename.c_str(),
"w+b");
860 if (mConfParam->dumpBadTFMode == 1) {
864 fwrite(
data.data(), 1,
data.size(), fp);
867 }
else if (mConfParam->dumpBadTFMode == 2) {
868 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
872 if (mConfParam->dump == 2) {
878 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
881 bool createEmptyOutput =
false;
883 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
884 if (mConfig->configProcessing.throttleAlarms) {
885 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
887 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
889 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
891 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
895 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
896 if (createEmptyOutput) {
897 memset(&ptrs, 0,
sizeof(ptrs));
898 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
899 if (outputBuffers[
i].
first) {
906 outputBuffers[
i].first->get().resize(toSize);
907 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
909 memset(outputBuffers[
i].second, 0, toSize);
913 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
914 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
919 clustersMCBuffer.second = clustersMCBuffer.first;
920 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
926 if (!mConfParam->allocateOutputOnTheFly) {
927 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
930 throw std::runtime_error(
"Preallocated buffer size exceeded");
933 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
937 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
943 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
947 throw std::runtime_error(
"cluster native output ptrs out of sync");
960 if (mClusterOutputIds.size() > 0) {
964 for (uint32_t
i = 0;
i < NSectors;
i++) {
965 if (mTPCSectorMask & (1ul <<
i)) {
967 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
970 memset(outIndex, 0,
sizeof(*outIndex));
1003 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
1004 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
1005 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
1006 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
1007 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1009 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1027 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1035 bool needCalibUpdate =
false;
1036 if (mGRPGeomUpdated) {
1037 mGRPGeomUpdated =
false;
1038 needCalibUpdate =
true;
1043 mITSGeometryCreated =
true;
1046 if (mAutoSolenoidBz) {
1052 if (mAutoContinuousMaxTimeBin) {
1055 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1058 if (!mPropagatorInstanceCreated) {
1060 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1063 mPropagatorInstanceCreated =
true;
1066 if (!mMatLUTCreated) {
1067 if (mConfParam->matLUTFile.size() == 0) {
1069 LOG(info) <<
"Loaded material budget lookup table";
1071 mMatLUTCreated =
true;
1074 if (!mTRDGeometryCreated) {
1076 gm->createPadPlaneArray();
1077 gm->createClusterMatrixArray();
1078 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1079 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1080 LOG(info) <<
"Loaded TRD geometry";
1081 mTRDGeometryCreated =
true;
1083 if (!mTRDRecoParamCreated) {
1084 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1085 newCalibObjects.
trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1086 mTRDRecoParamCreated =
true;
1090 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1092 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1094 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1096 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1097 needCalibUpdate =
true;
1101 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1102 if (!out.is_open()) {
1103 throw std::runtime_error(
"Failed to open output file: " +
path);
1106 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1108 throw std::runtime_error(
"Failed while writing data to: " +
path);
1111 for (
int i = 0;
i < 3;
i++) {
1116 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1120 if (needCalibUpdate) {
1121 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1122 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1131 char* o2jobid = getenv(
"O2JOBID");
1132 char* numaid = getenv(
"NUMAID");
1133 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1134 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1154 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1159 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1163 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1173 if (mapSources != 0) {
1194 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1232 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1233 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1234 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1236 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1238 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1241 if (mSpecConfig.
isITS3) {
1242 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1243 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1245 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1246 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1249 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1253 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1254 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1259 *mConfParam = mConfig->ReadConfigurableParam();
1260 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1262 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1264 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1265 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1267 std::map<std::string, std::string> metadata;
1268 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1269 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1270 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1271 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1272 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1273 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1275 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1276 for (
const auto& [
key,
value] : inputMap) {
1278 outputMetadata.push_back({
key,
value});
1284 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1286 if (mConfParam->printSettings) {
1287 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1288 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1289 for (
const auto& [
key,
value] : settings) {
1293 printSettings(metadata);
1297 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1298 convert_map_to_metadata(metadata, ccdb_metadata);
1299 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1300 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1301 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1303 convert_map_to_metadata(metadata, ccdb_metadata);
1304 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1307 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1308 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1309 convert_map_to_metadata(metadata, ccdb_metadata);
1310 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1313 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1314 convert_map_to_metadata(metadata, ccdb_metadata);
1315 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1325 std::vector<OutputSpec> outputSpecs;
1327 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1338 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1341 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1344 for (
auto const& sector : mTPCSectors) {
1345 mClusterOutputIds.emplace_back(sector);
1348 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1349 for (
const auto sector : mTPCSectors) {
1357 for (
const auto sector : mTPCSectors) {
1366 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1367 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1380 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1381 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1384 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1385 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1388 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1389 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1390 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1391 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1402 mDisplayFrontend.reset(
nullptr);
1403 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
static std::string getConfigOutputFileName(const std::string &procName, const std::string &confName="", bool json=true)
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
static void write(std::string const &filename, std::string const &keyOnly="")
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"