83#include <TStopwatch.h>
88#include <TGraphAsymmErrors.h>
100#include <unordered_set>
112GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
115 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
119 mConfParam.reset(
new GPUSettingsO2);
121 mTimer.reset(
new TStopwatch);
125 *gPolicyOrder = &mPolicyOrder;
137 mConfig->configGRP.solenoidBzNominalGPU = 0;
138 mTFSettings->hasSimStartOrbit = 1;
140 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
142 *mConfParam = mConfig->ReadConfigurableParam();
144 if (mConfParam->display) {
146 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
147 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
148 LOG(info) <<
"Event display enabled";
150 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
154 mConfig->configProcessing.doublePipeline = 1;
157 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
158 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
159 if (mAutoContinuousMaxTimeBin) {
162 if (mConfig->configProcessing.deviceNum == -2) {
165 mConfig->configProcessing.deviceNum = myId;
166 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
168 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
171 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
173 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
174 throw std::runtime_error(
"Need MC information to create QA plots");
177 mConfig->configQA.noMC =
true;
179 mConfig->configQA.shipToQC =
true;
180 if (!mConfig->configProcessing.runQA) {
181 mConfig->configQA.enableLocalOutput =
false;
183 mConfig->configProcessing.runQA = -mQATaskMask;
186 mConfig->configInterface.outputToExternalBuffers =
true;
196 GPUO2Interface::ApplySyncSettings(mConfig->configProcessing, mConfig->configReconstruction, mConfig->configWorkflow.steps, mConfParam->synchronousProcessing, runTracking ? mConfParam->rundEdx : -2);
214 if (mTPCSectorMask != 0xFFFFFFFFF) {
215 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
228 mConfig->configProcessing.outputSharedClusterMap =
true;
231 mConfig->configProcessing.createO2Output = 0;
235 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
236 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
237 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
238 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
244 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
248 mGPUReco = std::make_unique<GPUO2Interface>();
251 initFunctionTPCCalib(ic);
254 if (mConfig->configCalib.fastTransform ==
nullptr) {
255 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
258 if (mConfParam->matLUTFile.size()) {
259 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
261 if (mConfig->configCalib.matLUT ==
nullptr) {
262 LOGF(fatal,
"Error loading matlut file");
265 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
269 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
270 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
272 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
273 mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
276 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
277 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
278 if (mConfig->configReconstruction.tpc.trackReferenceX == 1000.f) {
279 mConfig->configReconstruction.tpc.trackReferenceX = 83.f;
283 mConfig->configProcessing.printSettings =
true;
284 if (mConfParam->printSettings > 1) {
285 mConfig->PrintParam();
290 if (mGPUReco->Initialize(config) != 0) {
291 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
294 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
297 mGPUReco->setErrorCodeOutput(&mErrorQA);
308 if (mConfParam->dump >= 2) {
309 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
314 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
315 if (info.size == 0) {
319 mRegionInfos.emplace_back(info);
324 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
328 if (mConfParam->mutexMemReg) {
329 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
330 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
332 throw std::runtime_error(
"Error opening memlock mutex lock file");
335 if (lockf(fd, F_LOCK, 0)) {
336 throw std::runtime_error(
"Error locking memlock mutex file");
339 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
340 if (mConfParam->benchmarkMemoryRegistration) {
341 start = std::chrono::high_resolution_clock::now();
343 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
344 throw std::runtime_error(
"Error registering memory for GPU");
346 if (mConfParam->benchmarkMemoryRegistration) {
347 end = std::chrono::high_resolution_clock::now();
348 std::chrono::duration<double> elapsed_seconds =
end -
start;
349 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
351 if (mConfParam->mutexMemReg) {
352 if (lockf(fd, F_ULOCK, 0)) {
353 throw std::runtime_error(
"Error unlocking memlock mutex file");
365 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
366 handlePipelineStop();
371 handlePipelineEndOfStream(ec);
377 finaliseCCDBTPC(matcher, obj);
379 finaliseCCDBITS(matcher, obj);
383 mGRPGeomUpdated =
true;
388template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
389void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
400 tpcZSmeta.Pointers[
i][
j].clear();
401 tpcZSmeta.Sizes[
i][
j].clear();
406 tpcZSonTheFlySizes = {0};
409 bool recv =
false, recvsizes =
false;
412 throw std::runtime_error(
"Received multiple ZSSIZES data");
414 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
421 throw std::runtime_error(
"Received multiple TPCZS data");
423 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
426 if (!recv || !recvsizes) {
427 throw std::runtime_error(
"TPC ZS on the fly data not received");
432 uint32_t pageSector = 0;
433 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
434 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
435 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
437 if (mVerbosity >= 1) {
438 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
444 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
445 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
447 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
448 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
449 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
450 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
451 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
453 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
454 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
455 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
457 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
458 if (checkForZSData(
ptr, subSpec)) {
459 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
460 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
461 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
462 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
467 static uint32_t nErrors = 0;
469 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
470 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
474 int32_t totalCount = 0;
477 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
478 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
479 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
480 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
481 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
482 totalCount += tpcZSmeta.Pointers[
i][
j].size();
488 compClustersFlatDummy.setForward(&compClustersDummy);
489 pCompClustersFlat = &compClustersFlatDummy;
493 if (pCompClustersFlat ==
nullptr) {
500 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
508 if (mConfParam->dump < 2) {
509 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
512 retVal = runITSTracking(*pc);
517 mGPUReco->Clear(
false, threadIndex);
522void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
524 if (mOldCalibObjects.size() > 0) {
525 mOldCalibObjects.pop();
527 mOldCalibObjects.emplace(std::move(oldCalibObjects));
535 auto cput = mTimer->CpuTime();
536 auto realt = mTimer->RealTime();
537 mTimer->Start(
false);
540 std::vector<gsl::span<const char>>
inputs;
548 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
549 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
550 std::unique_ptr<char[]> tmpEmptyCompClusters;
552 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
553 bool debugTFDump =
false;
556 getWorkflowTPCInput_mc =
true;
559 getWorkflowTPCInput_clusters =
true;
562 getWorkflowTPCInput_digits =
true;
567 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
575 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
580 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
581 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
584 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
585 mTFSettings->hasTfStartOrbit = 1;
586 mTFSettings->hasNHBFPerTF = 1;
588 mTFSettings->hasRunStartOrbit = 0;
593 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
595 if (mConfParam->checkFirstTfOrbit) {
596 static uint32_t lastFirstTFOrbit = -1;
597 static uint32_t lastTFCounter = -1;
598 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
599 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
600 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
601 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
602 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
605 lastFirstTFOrbit = tinfo.firstTForbit;
606 lastTFCounter = tinfo.tfCounter;
619 void* ptrEp[NSectors * NEndpoints] = {};
620 bool doInputDigits =
false, doInputDigitsMC =
false;
624 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
627 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
631 throw std::runtime_error(
"Cannot process MC information, none available");
634 doInputDigits =
true;
640 if (mTPCSectorMask != 0xFFFFFFFFF) {
642 for (uint32_t
i = 0;
i < NSectors;
i++) {
643 if (!(mTPCSectorMask & (1ul <<
i))) {
659 if (doInputDigitsMC) {
662 for (uint32_t
i = 0;
i < NSectors;
i++) {
663 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
664 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
665 if (doInputDigitsMC) {
666 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
672 if (mClusterOutputIds.size() > 0) {
673 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
675 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
680 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
682 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
690 using outputDataType =
char;
692 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
694 std::unordered_set<std::string> outputsCreated;
696 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
699 if (mConfParam->allocateOutputOnTheFly) {
700 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
703 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
705 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
707 start = std::chrono::high_resolution_clock::now();
710 outputsCreated.insert(
name);
712 end = std::chrono::high_resolution_clock::now();
713 std::chrono::duration<double> elapsed_seconds =
end -
start;
714 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
715 <<
": " << elapsed_seconds.count() <<
"s";
723 outputsCreated.insert(
name);
728 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
733 throw std::runtime_error(
"Invalid buffer size requested");
737 throw std::runtime_error(
"Inconsistent buffer address after downsize");
746 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
751 if (span.size() &&
buffer.second != (
char*)span.data()) {
752 throw std::runtime_error(
"Buffer does not match span");
754 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
773 throw std::runtime_error(
"Invalid input for gpu tracking");
778 calibObjectStruct oldCalibObjects;
779 doCalibUpdates(pc, oldCalibObjects);
781 lockDecodeInput.reset();
783 uint32_t threadIndex;
784 if (mConfParam->dump) {
786 while (pipelineContext->jobThreadIndex == -1) {
788 threadIndex = pipelineContext->jobThreadIndex;
793 std::string dir =
"";
794 if (mConfParam->dumpFolder !=
"") {
795 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
797 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
802 mGPUReco->DumpSettings(threadIndex, dir.c_str());
804 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
805 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
813 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
814 if (mConfParam->dumpBadTFMode == 2) {
816 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
821 if (!pipelineContext->jobSubmitted) {
822 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
824 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
826 std::unique_lock lk(pipelineContext->jobFinishedMutex);
827 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
828 retVal = pipelineContext->jobReturnValue;
829 threadIndex = pipelineContext->jobThreadIndex;
832 threadIndex = mNextThreadIndex;
833 if (mConfig->configProcessing.doublePipeline) {
834 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
837 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
842 cleanOldCalibsTPCPtrs(oldCalibObjects);
844 o2::utils::DebugStreamer::instance()->flush();
846 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
848 if (mConfParam->dumpBadTFMode <= 1) {
850 FILE* fp = fopen(
filename.c_str(),
"w+b");
854 if (mConfParam->dumpBadTFMode == 1) {
858 fwrite(
data.data(), 1,
data.size(), fp);
861 }
else if (mConfParam->dumpBadTFMode == 2) {
862 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
866 if (mConfParam->dump == 2) {
872 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
875 bool createEmptyOutput =
false;
877 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
878 if (mConfig->configProcessing.throttleAlarms) {
879 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
881 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
883 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
885 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
889 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
890 if (createEmptyOutput) {
891 memset(&ptrs, 0,
sizeof(ptrs));
892 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
893 if (outputBuffers[
i].
first) {
900 outputBuffers[
i].first->get().resize(toSize);
901 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
903 memset(outputBuffers[
i].second, 0, toSize);
907 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
908 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
913 clustersMCBuffer.second = clustersMCBuffer.first;
914 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
920 if (!mConfParam->allocateOutputOnTheFly) {
921 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
924 throw std::runtime_error(
"Preallocated buffer size exceeded");
927 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
931 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
937 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
941 throw std::runtime_error(
"cluster native output ptrs out of sync");
954 if (mClusterOutputIds.size() > 0) {
958 for (uint32_t
i = 0;
i < NSectors;
i++) {
959 if (mTPCSectorMask & (1ul <<
i)) {
961 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
964 memset(outIndex, 0,
sizeof(*outIndex));
997 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
998 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
999 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
1000 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
1001 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1003 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1021 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1029 bool needCalibUpdate =
false;
1030 if (mGRPGeomUpdated) {
1031 mGRPGeomUpdated =
false;
1032 needCalibUpdate =
true;
1037 mITSGeometryCreated =
true;
1040 if (mAutoSolenoidBz) {
1046 if (mAutoContinuousMaxTimeBin) {
1049 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1052 if (!mPropagatorInstanceCreated) {
1054 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1057 mPropagatorInstanceCreated =
true;
1060 if (!mMatLUTCreated) {
1061 if (mConfParam->matLUTFile.size() == 0) {
1063 LOG(info) <<
"Loaded material budget lookup table";
1065 mMatLUTCreated =
true;
1068 if (!mTRDGeometryCreated) {
1070 gm->createPadPlaneArray();
1071 gm->createClusterMatrixArray();
1072 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1073 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1074 LOG(info) <<
"Loaded TRD geometry";
1075 mTRDGeometryCreated =
true;
1077 if (!mTRDRecoParamCreated) {
1078 mTRDRecoParam = std::make_unique<GPUTRDRecoParam>();
1079 newCalibObjects.
trdRecoParam = mConfig->configCalib.trdRecoParam = mTRDRecoParam.get();
1080 mTRDRecoParamCreated =
true;
1084 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1086 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1088 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1090 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1091 needCalibUpdate =
true;
1095 std::ofstream out(
path, std::ios::binary | std::ios::trunc);
1096 if (!out.is_open()) {
1097 throw std::runtime_error(
"Failed to open output file: " +
path);
1100 out.write(
buffer,
static_cast<std::streamsize
>(validSize));
1102 throw std::runtime_error(
"Failed while writing data to: " +
path);
1105 for (
int i = 0;
i < 3;
i++) {
1110 LOG(info) <<
"Dumped TPC clusterizer NN " <<
i <<
" to file " <<
path;
1114 if (needCalibUpdate) {
1115 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1116 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1125 char* o2jobid = getenv(
"O2JOBID");
1126 char* numaid = getenv(
"NUMAID");
1127 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1128 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1145 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1150 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1154 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1164 if (mapSources != 0) {
1224 for (
unsigned int iLay{0}; iLay < (mSpecConfig.
itsStaggered ? 7 : 1); ++iLay) {
1225 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", iLay, Lifetime::Timeframe);
1226 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", iLay, Lifetime::Timeframe);
1227 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", iLay, Lifetime::Timeframe);
1229 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", iLay, Lifetime::Timeframe);
1233 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1235 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1238 if (mSpecConfig.
isITS3) {
1239 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1240 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1242 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1243 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1246 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1252 *mConfParam = mConfig->ReadConfigurableParam();
1253 if (mConfig->configProcessing.nn.nnLoadFromCCDB) {
1255 LOG(info) <<
"(NN CLUS) Enabling fetching of TPC NN clusterizer from CCDB";
1257 mSpecConfig.
nnDumpToFile = mConfig->configProcessing.nn.nnCCDBDumpToFile;
1258 GPUSettingsProcessingNNclusterizer& nnClusterizerSettings = mConfig->configProcessing.nn;
1260 std::map<std::string, std::string> metadata;
1261 metadata[
"inputDType"] = nnClusterizerSettings.nnInferenceInputDType;
1262 metadata[
"outputDType"] = nnClusterizerSettings.nnInferenceOutputDType;
1263 metadata[
"nnCCDBWithMomentum"] = nnClusterizerSettings.nnCCDBWithMomentum;
1264 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBClassificationLayerType;
1265 metadata[
"nnCCDBInteractionRate"] = nnClusterizerSettings.nnCCDBInteractionRate;
1266 metadata[
"nnCCDBBeamType"] = nnClusterizerSettings.nnCCDBBeamType;
1268 auto convert_map_to_metadata = [](
const std::map<std::string, std::string>& inputMap, std::vector<o2::framework::CCDBMetadata>& outputMetadata) {
1269 for (
const auto& [
key,
value] : inputMap) {
1271 outputMetadata.push_back({
key,
value});
1277 std::vector<o2::framework::CCDBMetadata> ccdb_metadata;
1279 if (mConfParam->printSettings) {
1280 auto printSettings = [](
const std::map<std::string, std::string>& settings) {
1281 LOG(info) <<
"(NN CLUS) NN Clusterizer CCDB settings:";
1282 for (
const auto& [
key,
value] : settings) {
1286 printSettings(metadata);
1290 metadata[
"nnCCDBEvalType"] =
"classification_c1";
1291 convert_map_to_metadata(metadata, ccdb_metadata);
1292 inputs.emplace_back(
"nn_classification_c1",
gDataOriginTPC,
"NNCLUSTERIZER_C1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1293 }
else if (mSpecConfig.
nnEvalMode[0] ==
"c2") {
1294 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1295 metadata[
"nnCCDBEvalType"] =
"classification_c2";
1296 convert_map_to_metadata(metadata, ccdb_metadata);
1297 inputs.emplace_back(
"nn_classification_c2",
gDataOriginTPC,
"NNCLUSTERIZER_C2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1300 metadata[
"nnCCDBEvalType"] =
"regression_c1";
1301 metadata[
"nnCCDBLayerType"] = nnClusterizerSettings.nnCCDBRegressionLayerType;
1302 convert_map_to_metadata(metadata, ccdb_metadata);
1303 inputs.emplace_back(
"nn_regression_c1",
gDataOriginTPC,
"NNCLUSTERIZER_R1", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1306 metadata[
"nnCCDBEvalType"] =
"regression_c2";
1307 convert_map_to_metadata(metadata, ccdb_metadata);
1308 inputs.emplace_back(
"nn_regression_c2",
gDataOriginTPC,
"NNCLUSTERIZER_R2", 0, Lifetime::Condition,
ccdbParamSpec(nnClusterizerSettings.nnCCDBPath +
"/" + metadata[
"nnCCDBEvalType"], ccdb_metadata, 0));
1318 std::vector<OutputSpec> outputSpecs;
1320 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1331 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1334 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1337 for (
auto const& sector : mTPCSectors) {
1338 mClusterOutputIds.emplace_back(sector);
1341 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1342 for (
const auto sector : mTPCSectors) {
1350 for (
const auto sector : mTPCSectors) {
1359 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1360 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1363 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1366 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1369 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1373 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1374 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1375 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1376 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1377 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1378 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1381 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1382 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1383 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1394 mDisplayFrontend.reset(
nullptr);
1395 mGPUReco.reset(
nullptr);
std::vector< std::string > labels
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
void dumpToFile(std::string fileName, const CathodeSegmentation &seg, const std::vector< Point > &points)
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Class to serialize ONNX objects for ROOT snapshots of CCDB objects at runtime.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
static std::string getConfigOutputFileName(const std::string &procName, const std::string &confName="", bool json=true)
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
static void write(std::string const &filename, std::string const &keyOnly="")
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static void ApplySyncSettings(GPUSettingsProcessing &proc, GPUSettingsRec &rec, gpudatatypes::RecoStepField &steps, bool syncMode, int32_t dEdxMode=-2)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLsizei const GLfloat * value
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLsizei const GLchar *const * path
constexpr o2::header::DataOrigin gDataOriginCTP
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining ITS Vertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
std::string name
The name of the associated DataProcessorSpec.
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< GPUTRDRecoParam >::type * trdRecoParam
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
S< o2::tpc::ORTRootSerializer >::type * nnClusterizerNetworks[3]
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
std::vector< std::string > nnEvalMode
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"