86#include <TStopwatch.h>
91#include <TGraphAsymmErrors.h>
103#include <unordered_set>
115GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
118 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
122 mConfParam.reset(
new GPUSettingsO2);
124 mTimer.reset(
new TStopwatch);
128 *gPolicyOrder = &mPolicyOrder;
140 mConfig->configGRP.solenoidBzNominalGPU = 0;
141 mTFSettings->hasSimStartOrbit = 1;
143 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
145 *mConfParam = mConfig->ReadConfigurableParam();
147 if (mConfParam->display) {
149 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
150 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
151 LOG(info) <<
"Event display enabled";
153 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
157 mConfig->configProcessing.doublePipeline = 1;
160 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
161 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
162 if (mAutoContinuousMaxTimeBin) {
165 if (mConfig->configProcessing.deviceNum == -2) {
168 mConfig->configProcessing.deviceNum = myId;
169 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
171 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
174 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
176 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
177 throw std::runtime_error(
"Need MC information to create QA plots");
180 mConfig->configQA.noMC =
true;
182 mConfig->configQA.shipToQC =
true;
183 if (!mConfig->configProcessing.runQA) {
184 mConfig->configQA.enableLocalOutput =
false;
186 mConfig->configProcessing.runQA = -mQATaskMask;
189 mConfig->configInterface.outputToExternalBuffers =
true;
199 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
217 if (mTPCSectorMask != 0xFFFFFFFFF) {
218 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
231 mConfig->configProcessing.outputSharedClusterMap =
true;
234 mConfig->configProcessing.createO2Output = 0;
238 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
239 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
240 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
241 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
247 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
251 mGPUReco = std::make_unique<GPUO2Interface>();
254 initFunctionTPCCalib(ic);
256 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
257 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
258 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
259 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
260 if (mConfig->configCalib.fastTransform ==
nullptr) {
261 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
264 if (mConfParam->matLUTFile.size()) {
265 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
267 if (mConfig->configCalib.matLUT ==
nullptr) {
268 LOGF(fatal,
"Error loading matlut file");
271 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
275 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
276 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
278 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
279 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
282 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
283 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
284 if (mConfig->configReconstruction.tpc.trackReferenceX == 1000.f) {
285 mConfig->configReconstruction.tpc.trackReferenceX = 83.f;
289 mConfig->configProcessing.printSettings =
true;
290 if (mConfParam->printSettings > 1) {
291 mConfig->PrintParam();
296 if (mGPUReco->Initialize(config) != 0) {
297 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
300 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
303 mGPUReco->setErrorCodeOutput(&mErrorQA);
314 if (mConfParam->dump >= 2) {
315 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
320 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
321 if (info.size == 0) {
325 mRegionInfos.emplace_back(info);
330 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
334 if (mConfParam->mutexMemReg) {
335 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
336 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
338 throw std::runtime_error(
"Error opening memlock mutex lock file");
341 if (lockf(fd, F_LOCK, 0)) {
342 throw std::runtime_error(
"Error locking memlock mutex file");
345 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
346 if (mConfParam->benchmarkMemoryRegistration) {
347 start = std::chrono::high_resolution_clock::now();
349 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
350 throw std::runtime_error(
"Error registering memory for GPU");
352 if (mConfParam->benchmarkMemoryRegistration) {
353 end = std::chrono::high_resolution_clock::now();
354 std::chrono::duration<double> elapsed_seconds =
end -
start;
355 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
357 if (mConfParam->mutexMemReg) {
358 if (lockf(fd, F_ULOCK, 0)) {
359 throw std::runtime_error(
"Error unlocking memlock mutex file");
371 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
372 handlePipelineStop();
377 handlePipelineEndOfStream(ec);
383 finaliseCCDBTPC(matcher, obj);
385 finaliseCCDBITS(matcher, obj);
389 mGRPGeomUpdated =
true;
394template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
395void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
406 tpcZSmeta.Pointers[
i][
j].clear();
407 tpcZSmeta.Sizes[
i][
j].clear();
412 tpcZSonTheFlySizes = {0};
415 bool recv =
false, recvsizes =
false;
418 throw std::runtime_error(
"Received multiple ZSSIZES data");
420 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
427 throw std::runtime_error(
"Received multiple TPCZS data");
429 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
432 if (!recv || !recvsizes) {
433 throw std::runtime_error(
"TPC ZS on the fly data not received");
438 uint32_t pageSector = 0;
439 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
440 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
441 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
443 if (mVerbosity >= 1) {
444 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
450 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
451 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
453 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
454 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
455 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
456 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
457 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
459 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
460 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
461 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
463 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
464 if (checkForZSData(
ptr, subSpec)) {
465 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
466 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
467 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
468 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
473 static uint32_t nErrors = 0;
475 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
476 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
480 int32_t totalCount = 0;
483 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
484 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
485 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
486 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
487 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
488 totalCount += tpcZSmeta.Pointers[
i][
j].size();
494 compClustersFlatDummy.setForward(&compClustersDummy);
495 pCompClustersFlat = &compClustersFlatDummy;
499 if (pCompClustersFlat ==
nullptr) {
506 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
514 if (mConfParam->dump < 2) {
515 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
518 retVal = runITSTracking(*pc);
523 mGPUReco->Clear(
false, threadIndex);
528void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
530 if (mOldCalibObjects.size() > 0) {
531 mOldCalibObjects.pop();
533 mOldCalibObjects.emplace(std::move(oldCalibObjects));
541 auto cput = mTimer->CpuTime();
542 auto realt = mTimer->RealTime();
543 mTimer->Start(
false);
546 std::vector<gsl::span<const char>>
inputs;
554 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
555 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
556 std::unique_ptr<char[]> tmpEmptyCompClusters;
558 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
559 bool debugTFDump =
false;
562 getWorkflowTPCInput_mc =
true;
565 getWorkflowTPCInput_clusters =
true;
568 getWorkflowTPCInput_digits =
true;
573 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
581 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
586 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
587 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
590 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
591 mTFSettings->hasTfStartOrbit = 1;
592 mTFSettings->hasNHBFPerTF = 1;
594 mTFSettings->hasRunStartOrbit = 0;
599 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
601 if (mConfParam->checkFirstTfOrbit) {
602 static uint32_t lastFirstTFOrbit = -1;
603 static uint32_t lastTFCounter = -1;
604 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
605 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
606 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
607 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
608 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
611 lastFirstTFOrbit = tinfo.firstTForbit;
612 lastTFCounter = tinfo.tfCounter;
625 void* ptrEp[NSectors * NEndpoints] = {};
626 bool doInputDigits =
false, doInputDigitsMC =
false;
630 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
633 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
637 throw std::runtime_error(
"Cannot process MC information, none available");
640 doInputDigits =
true;
646 if (mTPCSectorMask != 0xFFFFFFFFF) {
648 for (uint32_t
i = 0;
i < NSectors;
i++) {
649 if (!(mTPCSectorMask & (1ul <<
i))) {
665 if (doInputDigitsMC) {
668 for (uint32_t
i = 0;
i < NSectors;
i++) {
669 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
670 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
671 if (doInputDigitsMC) {
672 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
678 if (mClusterOutputIds.size() > 0) {
679 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
681 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
686 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
688 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
696 using outputDataType =
char;
698 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
700 std::unordered_set<std::string> outputsCreated;
702 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
705 if (mConfParam->allocateOutputOnTheFly) {
706 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
709 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
711 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
713 start = std::chrono::high_resolution_clock::now();
716 outputsCreated.insert(
name);
718 end = std::chrono::high_resolution_clock::now();
719 std::chrono::duration<double> elapsed_seconds =
end -
start;
720 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
721 <<
": " << elapsed_seconds.count() <<
"s";
729 outputsCreated.insert(
name);
734 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
739 throw std::runtime_error(
"Invalid buffer size requested");
743 throw std::runtime_error(
"Inconsistent buffer address after downsize");
752 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
757 if (span.size() &&
buffer.second != (
char*)span.data()) {
758 throw std::runtime_error(
"Buffer does not match span");
760 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
779 throw std::runtime_error(
"Invalid input for gpu tracking");
784 calibObjectStruct oldCalibObjects;
785 doCalibUpdates(pc, oldCalibObjects);
787 lockDecodeInput.reset();
789 uint32_t threadIndex;
790 if (mConfParam->dump) {
792 while (pipelineContext->jobThreadIndex == -1) {
794 threadIndex = pipelineContext->jobThreadIndex;
799 std::string dir =
"";
800 if (mConfParam->dumpFolder !=
"") {
801 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
803 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
808 mGPUReco->DumpSettings(threadIndex, dir.c_str());
810 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
811 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
819 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
820 if (mConfParam->dumpBadTFMode == 2) {
822 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
827 if (!pipelineContext->jobSubmitted) {
828 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
830 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
832 std::unique_lock lk(pipelineContext->jobFinishedMutex);
833 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
834 retVal = pipelineContext->jobReturnValue;
835 threadIndex = pipelineContext->jobThreadIndex;
838 threadIndex = mNextThreadIndex;
839 if (mConfig->configProcessing.doublePipeline) {
840 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
843 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
848 cleanOldCalibsTPCPtrs(oldCalibObjects);
850 o2::utils::DebugStreamer::instance()->flush();
852 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
854 if (mConfParam->dumpBadTFMode <= 1) {
856 FILE* fp = fopen(
filename.c_str(),
"w+b");
860 if (mConfParam->dumpBadTFMode == 1) {
864 fwrite(
data.data(), 1,
data.size(), fp);
867 }
else if (mConfParam->dumpBadTFMode == 2) {
868 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
872 if (mConfParam->dump == 2) {
878 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
881 bool createEmptyOutput =
false;
883 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
884 if (mConfig->configProcessing.throttleAlarms) {
885 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
887 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
889 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
891 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
895 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
896 if (createEmptyOutput) {
897 memset(&ptrs, 0,
sizeof(ptrs));
898 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
899 if (outputBuffers[
i].
first) {
906 outputBuffers[
i].first->get().resize(toSize);
907 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
909 memset(outputBuffers[
i].second, 0, toSize);
913 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
914 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
919 clustersMCBuffer.second = clustersMCBuffer.first;
920 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
926 if (!mConfParam->allocateOutputOnTheFly) {
927 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
930 throw std::runtime_error(
"Preallocated buffer size exceeded");
933 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
937 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
943 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
947 throw std::runtime_error(
"cluster native output ptrs out of sync");
960 if (mClusterOutputIds.size() > 0) {
964 for (uint32_t
i = 0;
i < NSectors;
i++) {
965 if (mTPCSectorMask & (1ul <<
i)) {
967 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
970 memset(outIndex, 0,
sizeof(*outIndex));
1003 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
1004 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
1005 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
1006 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
1007 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1009 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1027 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1035 bool needCalibUpdate =
false;
1036 if (mGRPGeomUpdated) {
1037 mGRPGeomUpdated =
false;
1038 needCalibUpdate =
true;
1043 mITSGeometryCreated =
true;
1046 if (mAutoSolenoidBz) {
1052 if (mAutoContinuousMaxTimeBin) {
1055 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1058 if (!mPropagatorInstanceCreated) {
1060 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1063 mPropagatorInstanceCreated =
true;
1066 if (!mMatLUTCreated) {
1067 if (mConfParam->matLUTFile.size() == 0) {
1069 LOG(info) <<
"Loaded material budget lookup table";
1071 mMatLUTCreated =
true;
1074 if (!mTRDGeometryCreated) {
1076 gm->createPadPlaneArray();
1077 gm->createClusterMatrixArray();
1078 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1079 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1080 LOG(info) <<
"Loaded TRD geometry";
1081 mTRDGeometryCreated =
true;
1083 if (!mTRDRecoParamCreated) {
1084 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1085 newCalibObjects.
trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1086 mTRDRecoParamCreated =
true;
1090 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1092 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1094 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1096 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1097 needCalibUpdate =
true;
1101 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1102 if (!out.is_open()) {
1103 throw std::runtime_error(
"Failed to open output file: " +
path);
1106 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1108 throw std::runtime_error(
"Failed while writing data to: " +
path);
1111 for (
int i = 0;
i < 3;
i++) {
1116 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1120 if (needCalibUpdate) {
1121 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1122 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1131 char* o2jobid = getenv(
"O2JOBID");
1132 char* numaid = getenv(
"NUMAID");
1133 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1134 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1154 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1159 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1163 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1173 if (mapSources != 0) {
1194 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1232 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1233 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1234 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1236 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1238 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1241 if (mSpecConfig.
isITS3) {
1242 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1243 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1245 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1246 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1249 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1253 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1254 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1259 *mConfParam = mConfig->ReadConfigurableParam();
1260 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1262 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1264 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1265 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1267 std::map<std::string, std::string> metadata;
1268 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1269 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1270 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1271 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1272 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1273 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1275 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1276 for (
const auto& [
key,
value] : inputMap) {
1278 outputMetadata.push_back({
key,
value});
1284 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1286 if (mConfParam->printSettings) {
1287 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1288 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1289 for (
const auto& [
key,
value] : settings) {
1293 printSettings(metadata);
1297 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1298 convert_map_to_metadata(metadata, ccdb_metadata);
1299 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1300 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1301 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1303 convert_map_to_metadata(metadata, ccdb_metadata);
1304 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1307 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1308 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1309 convert_map_to_metadata(metadata, ccdb_metadata);
1310 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1313 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1314 convert_map_to_metadata(metadata, ccdb_metadata);
1315 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1325 std::vector<OutputSpec> outputSpecs;
1327 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1338 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1341 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1344 for (
auto const& sector : mTPCSectors) {
1345 mClusterOutputIds.emplace_back(sector);
1348 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1349 for (
const auto sector : mTPCSectors) {
1357 for (
const auto sector : mTPCSectors) {
1366 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1367 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1370 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1380 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1381 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1384 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1385 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1388 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1389 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1390 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1391 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1402 mDisplayFrontend.reset(
nullptr);
1403 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
static std::string getConfigOutputFileName(const std::string &procName, const std::string &confName="", bool json=true)
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
static void write(std::string const &filename, std::string const &keyOnly="")
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"