84#include <TStopwatch.h>
89#include <TGraphAsymmErrors.h>
101#include <unordered_set>
113GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
116 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer or CompCluster decompression is activated");
120 mConfParam.reset(
new GPUSettingsO2);
122 mTimer.reset(
new TStopwatch);
126 *gPolicyOrder = &mPolicyOrder;
136 GPUSettingsProcessingNNclusterizer& mNNClusterizerSettings = mConfig->configProcessing.nn;
138 if (mNNClusterizerSettings.nnLoadFromCCDB) {
139 LOG(info) <<
"Loading neural networks from CCDB";
141 nnClusterizerFetcher.
initCcdbApi(mNNClusterizerSettings.nnCCDBURL);
142 std::map<std::string, std::string> ccdbSettings = {
143 {
"nnCCDBURL", mNNClusterizerSettings.nnCCDBURL},
144 {
"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
145 {
"inputDType", mNNClusterizerSettings.nnInferenceInputDType},
146 {
"outputDType", mNNClusterizerSettings.nnInferenceOutputDType},
147 {
"outputFolder", mNNClusterizerSettings.nnLocalFolder},
148 {
"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
149 {
"nnCCDBWithMomentum",
std::to_string(mNNClusterizerSettings.nnCCDBWithMomentum)},
150 {
"nnCCDBBeamType", mNNClusterizerSettings.nnCCDBBeamType},
151 {
"nnCCDBInteractionRate",
std::to_string(mNNClusterizerSettings.nnCCDBInteractionRate)}};
153 std::string nnFetchFolder = mNNClusterizerSettings.nnLocalFolder;
156 if (evalMode[0] ==
"c1") {
157 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
158 ccdbSettings[
"nnCCDBEvalType"] =
"classification_c1";
159 ccdbSettings[
"outputFile"] =
"net_classification_c1.onnx";
161 }
else if (evalMode[0] ==
"c2") {
162 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
163 ccdbSettings[
"nnCCDBEvalType"] =
"classification_c2";
164 ccdbSettings[
"outputFile"] =
"net_classification_c2.onnx";
168 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
169 ccdbSettings[
"nnCCDBEvalType"] =
"regression_c1";
170 ccdbSettings[
"outputFile"] =
"net_regression_c1.onnx";
172 if (evalMode[1] ==
"r2") {
173 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
174 ccdbSettings[
"nnCCDBEvalType"] =
"regression_c2";
175 ccdbSettings[
"outputFile"] =
"net_regression_c2.onnx";
178 LOG(info) <<
"Neural network loading done!";
182 mConfig->configGRP.solenoidBzNominalGPU = 0;
183 mTFSettings->hasSimStartOrbit = 1;
185 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
187 *mConfParam = mConfig->ReadConfigurableParam();
188 if (mConfParam->display) {
190 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
191 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
192 LOG(info) <<
"Event display enabled";
194 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
198 mConfig->configProcessing.doublePipeline = 1;
201 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
202 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
203 if (mAutoContinuousMaxTimeBin) {
206 if (mConfig->configProcessing.deviceNum == -2) {
209 mConfig->configProcessing.deviceNum = myId;
210 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
212 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
215 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
217 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
218 throw std::runtime_error(
"Need MC information to create QA plots");
221 mConfig->configQA.noMC =
true;
223 mConfig->configQA.shipToQC =
true;
224 if (!mConfig->configProcessing.runQA) {
225 mConfig->configQA.enableLocalOutput =
false;
226 mQATaskMask = (mSpecConfig.
processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
227 mConfig->configProcessing.runQA = -mQATaskMask;
230 mConfig->configInterface.outputToExternalBuffers =
true;
231 if (mConfParam->synchronousProcessing) {
232 mConfig->configReconstruction.useMatLUT =
false;
234 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
235 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
244 mConfig->configWorkflow.steps.setBits(
GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
262 if (mTPCSectorMask != 0xFFFFFFFFF) {
263 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
276 mConfig->configProcessing.outputSharedClusterMap =
true;
279 mConfig->configProcessing.createO2Output = 0;
283 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
284 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
285 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
286 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
292 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
296 mGPUReco = std::make_unique<GPUO2Interface>();
299 initFunctionTPCCalib(ic);
301 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
302 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
303 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
304 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
305 if (mConfig->configCalib.fastTransform ==
nullptr) {
306 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
309 if (mConfParam->matLUTFile.size()) {
310 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
312 if (mConfig->configCalib.matLUT ==
nullptr) {
313 LOGF(fatal,
"Error loading matlut file");
316 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
320 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
321 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
324 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
325 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
328 mConfig->configProcessing.printSettings =
true;
329 if (mConfParam->printSettings > 1) {
330 mConfig->PrintParam();
335 if (mGPUReco->Initialize(config) != 0) {
336 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
339 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
342 mGPUReco->setErrorCodeOutput(&mErrorQA);
353 if (mConfParam->dump >= 2) {
354 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
359 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
360 if (info.size == 0) {
364 mRegionInfos.emplace_back(info);
369 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
373 if (mConfParam->mutexMemReg) {
374 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
375 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
377 throw std::runtime_error(
"Error opening memlock mutex lock file");
380 if (lockf(fd, F_LOCK, 0)) {
381 throw std::runtime_error(
"Error locking memlock mutex file");
384 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
385 if (mConfParam->benchmarkMemoryRegistration) {
386 start = std::chrono::high_resolution_clock::now();
388 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
389 throw std::runtime_error(
"Error registering memory for GPU");
391 if (mConfParam->benchmarkMemoryRegistration) {
392 end = std::chrono::high_resolution_clock::now();
393 std::chrono::duration<double> elapsed_seconds =
end -
start;
394 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
396 if (mConfParam->mutexMemReg) {
397 if (lockf(fd, F_ULOCK, 0)) {
398 throw std::runtime_error(
"Error unlocking memlock mutex file");
410 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
411 handlePipelineStop();
416 handlePipelineEndOfStream(ec);
422 finaliseCCDBTPC(matcher, obj);
424 finaliseCCDBITS(matcher, obj);
428 mGRPGeomUpdated =
true;
433template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
434void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
445 tpcZSmeta.Pointers[
i][
j].clear();
446 tpcZSmeta.Sizes[
i][
j].clear();
451 tpcZSonTheFlySizes = {0};
454 bool recv =
false, recvsizes =
false;
457 throw std::runtime_error(
"Received multiple ZSSIZES data");
459 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
466 throw std::runtime_error(
"Received multiple TPCZS data");
468 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
471 if (!recv || !recvsizes) {
472 throw std::runtime_error(
"TPC ZS on the fly data not received");
477 uint32_t pageSector = 0;
478 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
479 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
480 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
482 if (mVerbosity >= 1) {
483 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
489 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
490 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
492 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
493 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
494 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
495 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
496 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
498 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
499 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
500 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
502 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
503 if (checkForZSData(
ptr, subSpec)) {
504 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
505 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
506 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
507 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
512 static uint32_t nErrors = 0;
514 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
515 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
519 int32_t totalCount = 0;
522 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
523 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
524 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
525 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
526 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
527 totalCount += tpcZSmeta.Pointers[
i][
j].size();
533 compClustersFlatDummy.setForward(&compClustersDummy);
534 pCompClustersFlat = &compClustersFlatDummy;
538 if (pCompClustersFlat ==
nullptr) {
545 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
553 if (mConfParam->dump < 2) {
554 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
557 retVal = runITSTracking(*pc);
562 mGPUReco->Clear(
false, threadIndex);
567void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
569 if (mOldCalibObjects.size() > 0) {
570 mOldCalibObjects.pop();
572 mOldCalibObjects.emplace(std::move(oldCalibObjects));
580 auto cput = mTimer->CpuTime();
581 auto realt = mTimer->RealTime();
582 mTimer->Start(
false);
585 std::vector<gsl::span<const char>>
inputs;
593 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
594 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
595 std::unique_ptr<char[]> tmpEmptyCompClusters;
597 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
598 bool debugTFDump =
false;
601 getWorkflowTPCInput_mc =
true;
604 getWorkflowTPCInput_clusters =
true;
607 getWorkflowTPCInput_digits =
true;
612 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
620 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
625 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
626 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
629 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
630 mTFSettings->hasTfStartOrbit = 1;
631 mTFSettings->hasNHBFPerTF = 1;
633 mTFSettings->hasRunStartOrbit = 0;
638 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
640 if (mConfParam->checkFirstTfOrbit) {
641 static uint32_t lastFirstTFOrbit = -1;
642 static uint32_t lastTFCounter = -1;
643 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
644 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
645 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
646 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
647 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
650 lastFirstTFOrbit = tinfo.firstTForbit;
651 lastTFCounter = tinfo.tfCounter;
664 void* ptrEp[NSectors * NEndpoints] = {};
665 bool doInputDigits =
false, doInputDigitsMC =
false;
669 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
672 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
676 throw std::runtime_error(
"Cannot process MC information, none available");
679 doInputDigits =
true;
685 if (mTPCSectorMask != 0xFFFFFFFFF) {
687 for (uint32_t
i = 0;
i < NSectors;
i++) {
688 if (!(mTPCSectorMask & (1ul <<
i))) {
704 if (doInputDigitsMC) {
707 for (uint32_t
i = 0;
i < NSectors;
i++) {
708 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
709 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
710 if (doInputDigitsMC) {
711 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
717 if (mClusterOutputIds.size() > 0) {
718 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
720 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
725 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
727 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
735 using outputDataType =
char;
737 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
739 std::unordered_set<std::string> outputsCreated;
741 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
744 if (mConfParam->allocateOutputOnTheFly) {
745 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
748 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
750 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
752 start = std::chrono::high_resolution_clock::now();
755 outputsCreated.insert(
name);
757 end = std::chrono::high_resolution_clock::now();
758 std::chrono::duration<double> elapsed_seconds =
end -
start;
759 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
760 <<
": " << elapsed_seconds.count() <<
"s";
768 outputsCreated.insert(
name);
773 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
778 throw std::runtime_error(
"Invalid buffer size requested");
782 throw std::runtime_error(
"Inconsistent buffer address after downsize");
791 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
796 if (span.size() &&
buffer.second != (
char*)span.data()) {
797 throw std::runtime_error(
"Buffer does not match span");
799 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
818 throw std::runtime_error(
"Invalid input for gpu tracking");
823 calibObjectStruct oldCalibObjects;
824 doCalibUpdates(pc, oldCalibObjects);
826 lockDecodeInput.reset();
828 uint32_t threadIndex;
829 if (mConfParam->dump) {
831 while (pipelineContext->jobThreadIndex == -1) {
833 threadIndex = pipelineContext->jobThreadIndex;
838 std::string dir =
"";
839 if (mConfParam->dumpFolder !=
"") {
840 dir = std::regex_replace(mConfParam->dumpFolder, std::regex(
"\\[P\\]"),
std::to_string(getpid()));
842 mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
847 mGPUReco->DumpSettings(threadIndex, dir.c_str());
849 if (tinfo.tfCounter >= mConfParam->dumpFirst && (mConfParam->dumpLast == -1 || tinfo.tfCounter <= mConfParam->dumpLast)) {
850 mGPUReco->DumpEvent(mNTFDumps, &ptrs, threadIndex, dir.c_str());
854 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
855 if (mConfParam->dumpBadTFMode == 2) {
857 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
862 if (!pipelineContext->jobSubmitted) {
863 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
865 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
867 std::unique_lock lk(pipelineContext->jobFinishedMutex);
868 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
869 retVal = pipelineContext->jobReturnValue;
870 threadIndex = pipelineContext->jobThreadIndex;
873 threadIndex = mNextThreadIndex;
874 if (mConfig->configProcessing.doublePipeline) {
875 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
878 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
883 cleanOldCalibsTPCPtrs(oldCalibObjects);
885 o2::utils::DebugStreamer::instance()->flush();
887 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
889 if (mConfParam->dumpBadTFMode <= 1) {
891 FILE* fp = fopen(
filename.c_str(),
"w+b");
895 if (mConfParam->dumpBadTFMode == 1) {
899 fwrite(
data.data(), 1,
data.size(), fp);
902 }
else if (mConfParam->dumpBadTFMode == 2) {
903 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get(), threadIndex);
907 if (mConfParam->dump == 2) {
913 if (mConfig->configProcessing.tpcWriteClustersAfterRejection) {
916 bool createEmptyOutput =
false;
918 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
919 if (mConfig->configProcessing.throttleAlarms) {
920 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
922 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
924 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
926 LOG(fatal) <<
"GPU Reconstruction aborted with error code " <<
retVal <<
" - errors are not ignored - terminating";
930 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
931 if (createEmptyOutput) {
932 memset(&ptrs, 0,
sizeof(ptrs));
933 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
934 if (outputBuffers[
i].
first) {
941 outputBuffers[
i].first->get().resize(toSize);
942 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
944 memset(outputBuffers[
i].second, 0, toSize);
948 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
949 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
954 clustersMCBuffer.second = clustersMCBuffer.first;
955 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
961 if (!mConfParam->allocateOutputOnTheFly) {
962 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
965 throw std::runtime_error(
"Preallocated buffer size exceeded");
968 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
972 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
978 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
982 throw std::runtime_error(
"cluster native output ptrs out of sync");
995 if (mClusterOutputIds.size() > 0) {
999 for (uint32_t
i = 0;
i < NSectors;
i++) {
1000 if (mTPCSectorMask & (1ul <<
i)) {
1002 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
1005 memset(outIndex, 0,
sizeof(*outIndex));
1014 for (
const auto&
label : labels) {
1038 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
1039 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
1040 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
1041 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
1042 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1044 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1062 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1070 bool needCalibUpdate =
false;
1071 if (mGRPGeomUpdated) {
1072 mGRPGeomUpdated =
false;
1073 needCalibUpdate =
true;
1078 mITSGeometryCreated =
true;
1081 if (mAutoSolenoidBz) {
1087 if (mAutoContinuousMaxTimeBin) {
1090 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1093 if (!mPropagatorInstanceCreated) {
1095 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1098 mPropagatorInstanceCreated =
true;
1101 if (!mMatLUTCreated) {
1102 if (mConfParam->matLUTFile.size() == 0) {
1104 LOG(info) <<
"Loaded material budget lookup table";
1106 mMatLUTCreated =
true;
1110 gm->createPadPlaneArray();
1111 gm->createClusterMatrixArray();
1112 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1113 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1114 LOG(info) <<
"Loaded TRD geometry";
1115 mTRDGeometryCreated =
true;
1118 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1120 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1122 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1124 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1125 needCalibUpdate =
true;
1127 if (needCalibUpdate) {
1128 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1129 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1138 char* o2jobid = getenv(
"O2JOBID");
1139 char* numaid = getenv(
"NUMAID");
1140 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1141 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1161 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1166 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1170 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1180 if (mapSources != 0) {
1201 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1239 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1240 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1241 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1243 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1245 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1248 if (mSpecConfig.
isITS3) {
1249 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1250 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1252 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1253 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1256 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1260 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1261 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1271 std::vector<OutputSpec> outputSpecs;
1273 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1284 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1287 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1290 for (
auto const& sector : mTPCSectors) {
1291 mClusterOutputIds.emplace_back(sector);
1294 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1295 for (
const auto sector : mTPCSectors) {
1303 for (
const auto sector : mTPCSectors) {
1312 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1313 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1316 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1319 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1322 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1326 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1327 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1328 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1329 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1330 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1331 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1334 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1335 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1336 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1337 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1348 mDisplayFrontend.reset(
nullptr);
1349 mGPUReco.reset(
nullptr);
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Fetching neural networks for clusterization from CCDB.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
void initCcdbApi(std::string url)
void loadIndividualFromCCDB(std::map< std::string, std::string > settings)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool outputCompClustersRoot
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool useFilteredOutputSpecs
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
const o2::tpc::ClusterNativeAccess * clustersNativeReduced
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"