84#include <TStopwatch.h>
89#include <TGraphAsymmErrors.h>
101#include <unordered_set>
113GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
116 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer is activated");
120 mConfParam.reset(
new GPUSettingsO2);
122 mTimer.reset(
new TStopwatch);
126 *gPolicyOrder = &mPolicyOrder;
136 GPUSettingsProcessingNNclusterizer& mNNClusterizerSettings = mConfig->configProcessing.nn;
138 if (mNNClusterizerSettings.nnLoadFromCCDB) {
139 LOG(info) <<
"Loading neural networks from CCDB";
141 nnClusterizerFetcher.
initCcdbApi(mNNClusterizerSettings.nnCCDBURL);
142 std::map<std::string, std::string> ccdbSettings = {
143 {
"nnCCDBURL", mNNClusterizerSettings.nnCCDBURL},
144 {
"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
145 {
"inputDType", mNNClusterizerSettings.nnInferenceInputDType},
146 {
"outputDType", mNNClusterizerSettings.nnInferenceOutputDType},
147 {
"outputFolder", mNNClusterizerSettings.nnLocalFolder},
148 {
"nnCCDBPath", mNNClusterizerSettings.nnCCDBPath},
149 {
"nnCCDBWithMomentum",
std::to_string(mNNClusterizerSettings.nnCCDBWithMomentum)},
150 {
"nnCCDBBeamType", mNNClusterizerSettings.nnCCDBBeamType},
151 {
"nnCCDBInteractionRate",
std::to_string(mNNClusterizerSettings.nnCCDBInteractionRate)}};
153 std::string nnFetchFolder = mNNClusterizerSettings.nnLocalFolder;
156 if (evalMode[0] ==
"c1") {
157 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
158 ccdbSettings[
"nnCCDBEvalType"] =
"classification_c1";
159 ccdbSettings[
"outputFile"] =
"net_classification_c1.onnx";
161 }
else if (evalMode[0] ==
"c2") {
162 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBClassificationLayerType;
163 ccdbSettings[
"nnCCDBEvalType"] =
"classification_c2";
164 ccdbSettings[
"outputFile"] =
"net_classification_c2.onnx";
168 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
169 ccdbSettings[
"nnCCDBEvalType"] =
"regression_c1";
170 ccdbSettings[
"outputFile"] =
"net_regression_c1.onnx";
172 if (evalMode[1] ==
"r2") {
173 ccdbSettings[
"nnCCDBLayerType"] = mNNClusterizerSettings.nnCCDBRegressionLayerType;
174 ccdbSettings[
"nnCCDBEvalType"] =
"regression_c2";
175 ccdbSettings[
"outputFile"] =
"net_regression_c2.onnx";
178 LOG(info) <<
"Neural network loading done!";
182 mConfig->configGRP.solenoidBzNominalGPU = 0;
183 mTFSettings->hasSimStartOrbit = 1;
185 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
187 *mConfParam = mConfig->ReadConfigurableParam();
188 if (mConfParam->display) {
190 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
191 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
192 LOG(info) <<
"Event display enabled";
194 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
198 mConfig->configProcessing.doublePipeline = 1;
201 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
202 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
203 if (mAutoContinuousMaxTimeBin) {
206 if (mConfig->configProcessing.deviceNum == -2) {
209 mConfig->configProcessing.deviceNum = myId;
210 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
212 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
215 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
217 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
218 throw std::runtime_error(
"Need MC information to create QA plots");
221 mConfig->configQA.noMC =
true;
223 mConfig->configQA.shipToQC =
true;
224 if (!mConfig->configProcessing.runQA) {
225 mConfig->configQA.enableLocalOutput =
false;
226 mQATaskMask = (mSpecConfig.
processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
227 mConfig->configProcessing.runQA = -mQATaskMask;
230 mConfig->configReconstruction.tpc.nWaysOuter =
true;
231 mConfig->configInterface.outputToExternalBuffers =
true;
232 if (mConfParam->synchronousProcessing) {
233 mConfig->configReconstruction.useMatLUT =
false;
235 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
236 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
245 mConfig->configWorkflow.steps.setBits(
GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
263 if (mTPCSectorMask != 0xFFFFFFFFF) {
264 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
277 mConfig->configProcessing.outputSharedClusterMap =
true;
280 mConfig->configProcessing.createO2Output = 0;
284 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
285 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
286 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
287 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
293 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
297 mGPUReco = std::make_unique<GPUO2Interface>();
300 initFunctionTPCCalib(ic);
302 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
303 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
304 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
305 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
306 if (mConfig->configCalib.fastTransform ==
nullptr) {
307 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
310 if (mConfParam->matLUTFile.size()) {
311 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
313 if (mConfig->configCalib.matLUT ==
nullptr) {
314 LOGF(fatal,
"Error loading matlut file");
317 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
321 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
322 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
325 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
326 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
329 mConfig->configProcessing.printSettings =
true;
330 if (mConfParam->printSettings > 1) {
331 mConfig->PrintParam();
336 if (mGPUReco->Initialize(config) != 0) {
337 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
340 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
343 mGPUReco->setErrorCodeOutput(&mErrorQA);
354 if (mConfParam->dump >= 2) {
355 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
360 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
361 if (info.size == 0) {
365 mRegionInfos.emplace_back(info);
370 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
374 if (mConfParam->mutexMemReg) {
375 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
376 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
378 throw std::runtime_error(
"Error opening memlock mutex lock file");
381 if (lockf(fd, F_LOCK, 0)) {
382 throw std::runtime_error(
"Error locking memlock mutex file");
385 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
386 if (mConfParam->benchmarkMemoryRegistration) {
387 start = std::chrono::high_resolution_clock::now();
389 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
390 throw std::runtime_error(
"Error registering memory for GPU");
392 if (mConfParam->benchmarkMemoryRegistration) {
393 end = std::chrono::high_resolution_clock::now();
394 std::chrono::duration<double> elapsed_seconds =
end -
start;
395 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
397 if (mConfParam->mutexMemReg) {
398 if (lockf(fd, F_ULOCK, 0)) {
399 throw std::runtime_error(
"Error unlocking memlock mutex file");
411 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
412 handlePipelineStop();
417 handlePipelineEndOfStream(ec);
423 finaliseCCDBTPC(matcher, obj);
425 finaliseCCDBITS(matcher, obj);
429 mGRPGeomUpdated =
true;
434template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
435void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
446 tpcZSmeta.Pointers[
i][
j].clear();
447 tpcZSmeta.Sizes[
i][
j].clear();
452 tpcZSonTheFlySizes = {0};
455 bool recv =
false, recvsizes =
false;
458 throw std::runtime_error(
"Received multiple ZSSIZES data");
460 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
467 throw std::runtime_error(
"Received multiple TPCZS data");
469 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
472 if (!recv || !recvsizes) {
473 throw std::runtime_error(
"TPC ZS on the fly data not received");
478 uint32_t pageSector = 0;
479 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
480 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
481 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
483 if (mVerbosity >= 1) {
484 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
490 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
491 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
493 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
494 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
495 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
496 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
497 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
499 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
500 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
501 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
503 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
504 if (checkForZSData(
ptr, subSpec)) {
505 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
506 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
507 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
508 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
513 static uint32_t nErrors = 0;
515 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
516 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
520 int32_t totalCount = 0;
523 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
524 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
525 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
526 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
527 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
528 totalCount += tpcZSmeta.Pointers[
i][
j].size();
534 compClustersFlatDummy.setForward(&compClustersDummy);
535 pCompClustersFlat = &compClustersFlatDummy;
539 if (pCompClustersFlat ==
nullptr) {
546 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
554 if (mConfParam->dump < 2) {
555 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
558 retVal = runITSTracking(*pc);
563 mGPUReco->Clear(
false, threadIndex);
568void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
570 if (mOldCalibObjects.size() > 0) {
571 mOldCalibObjects.pop();
573 mOldCalibObjects.emplace(std::move(oldCalibObjects));
581 auto cput = mTimer->CpuTime();
582 auto realt = mTimer->RealTime();
583 mTimer->Start(
false);
586 std::vector<gsl::span<const char>>
inputs;
594 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
595 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
596 std::unique_ptr<char[]> tmpEmptyCompClusters;
598 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
599 bool debugTFDump =
false;
602 getWorkflowTPCInput_mc =
true;
605 getWorkflowTPCInput_clusters =
true;
608 getWorkflowTPCInput_digits =
true;
613 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
621 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
626 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
627 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
630 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
631 mTFSettings->hasTfStartOrbit = 1;
632 mTFSettings->hasNHBFPerTF = 1;
634 mTFSettings->hasRunStartOrbit = 0;
639 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
641 if (mConfParam->checkFirstTfOrbit) {
642 static uint32_t lastFirstTFOrbit = -1;
643 static uint32_t lastTFCounter = -1;
644 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
645 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
646 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
647 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
648 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
651 lastFirstTFOrbit = tinfo.firstTForbit;
652 lastTFCounter = tinfo.tfCounter;
665 void* ptrEp[NSectors * NEndpoints] = {};
666 bool doInputDigits =
false, doInputDigitsMC =
false;
670 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
673 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
677 throw std::runtime_error(
"Cannot process MC information, none available");
680 doInputDigits =
true;
686 if (mTPCSectorMask != 0xFFFFFFFFF) {
688 for (uint32_t
i = 0;
i < NSectors;
i++) {
689 if (!(mTPCSectorMask & (1ul <<
i))) {
705 if (doInputDigitsMC) {
708 for (uint32_t
i = 0;
i < NSectors;
i++) {
709 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
710 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
711 if (doInputDigitsMC) {
712 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
718 if (mClusterOutputIds.size() > 0) {
719 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
721 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
726 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
728 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
736 using outputDataType =
char;
738 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
740 std::unordered_set<std::string> outputsCreated;
742 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
745 if (mConfParam->allocateOutputOnTheFly) {
746 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
749 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
751 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
753 start = std::chrono::high_resolution_clock::now();
756 outputsCreated.insert(
name);
758 end = std::chrono::high_resolution_clock::now();
759 std::chrono::duration<double> elapsed_seconds =
end -
start;
760 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
761 <<
": " << elapsed_seconds.count() <<
"s";
769 outputsCreated.insert(
name);
774 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
779 throw std::runtime_error(
"Invalid buffer size requested");
783 throw std::runtime_error(
"Inconsistent buffer address after downsize");
792 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
797 if (span.size() &&
buffer.second != (
char*)span.data()) {
798 throw std::runtime_error(
"Buffer does not match span");
800 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
819 throw std::runtime_error(
"Invalid input for gpu tracking");
824 calibObjectStruct oldCalibObjects;
825 doCalibUpdates(pc, oldCalibObjects);
827 lockDecodeInput.reset();
829 if (mConfParam->dump) {
831 mGPUReco->DumpSettings();
833 mGPUReco->DumpEvent(mNTFs - 1, &ptrs);
835 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
836 if (mConfParam->dumpBadTFMode == 2) {
838 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
843 if (!pipelineContext->jobSubmitted) {
844 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
846 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
848 std::unique_lock lk(pipelineContext->jobFinishedMutex);
849 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
850 retVal = pipelineContext->jobReturnValue;
853 uint32_t threadIndex = mNextThreadIndex;
854 if (mConfig->configProcessing.doublePipeline) {
855 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
858 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
863 cleanOldCalibsTPCPtrs(oldCalibObjects);
865 o2::utils::DebugStreamer::instance()->flush();
867 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
869 if (mConfParam->dumpBadTFMode <= 1) {
871 FILE* fp = fopen(
filename.c_str(),
"w+b");
875 if (mConfParam->dumpBadTFMode == 1) {
879 fwrite(
data.data(), 1,
data.size(), fp);
882 }
else if (mConfParam->dumpBadTFMode == 2) {
883 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get());
887 if (mConfParam->dump == 2) {
893 bool createEmptyOutput =
false;
895 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
896 if (mConfig->configProcessing.throttleAlarms) {
897 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
899 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
901 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
907 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
908 if (createEmptyOutput) {
909 memset(&ptrs, 0,
sizeof(ptrs));
910 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
911 if (outputBuffers[
i].
first) {
918 outputBuffers[
i].first->get().resize(toSize);
919 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
921 memset(outputBuffers[
i].second, 0, toSize);
925 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
926 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
931 clustersMCBuffer.second = clustersMCBuffer.first;
932 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
938 if (!mConfParam->allocateOutputOnTheFly) {
939 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
942 throw std::runtime_error(
"Preallocated buffer size exceeded");
945 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
949 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
955 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
959 throw std::runtime_error(
"cluster native output ptrs out of sync");
972 if (mClusterOutputIds.size() > 0) {
976 for (uint32_t
i = 0;
i < NSectors;
i++) {
977 if (mTPCSectorMask & (1ul <<
i)) {
979 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
982 memset(outIndex, 0,
sizeof(*outIndex));
991 for (
const auto&
label : labels) {
1015 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
1016 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
1017 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
1018 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
1019 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
1021 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
1039 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
1047 bool needCalibUpdate =
false;
1048 if (mGRPGeomUpdated) {
1049 mGRPGeomUpdated =
false;
1050 needCalibUpdate =
true;
1055 mITSGeometryCreated =
true;
1058 if (mAutoSolenoidBz) {
1064 if (mAutoContinuousMaxTimeBin) {
1067 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1070 if (!mPropagatorInstanceCreated) {
1072 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1075 mPropagatorInstanceCreated =
true;
1078 if (!mMatLUTCreated) {
1079 if (mConfParam->matLUTFile.size() == 0) {
1081 LOG(info) <<
"Loaded material budget lookup table";
1083 mMatLUTCreated =
true;
1087 gm->createPadPlaneArray();
1088 gm->createClusterMatrixArray();
1089 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1090 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1091 LOG(info) <<
"Loaded TRD geometry";
1092 mTRDGeometryCreated =
true;
1095 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1097 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1099 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1101 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1102 needCalibUpdate =
true;
1104 if (needCalibUpdate) {
1105 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1106 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1115 char* o2jobid = getenv(
"O2JOBID");
1116 char* numaid = getenv(
"NUMAID");
1117 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1118 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1138 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1143 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1147 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1157 if (mapSources != 0) {
1178 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1216 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1217 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1218 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1220 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1222 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1225 if (mSpecConfig.
isITS3) {
1226 inputs.emplace_back(
"cldict",
"IT3",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"IT3/Calib/ClusterDictionary"));
1227 inputs.emplace_back(
"alppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1229 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1230 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1233 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1237 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1238 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1248 std::vector<OutputSpec> outputSpecs;
1250 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1254 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKS", 0, Lifetime::Timeframe);
1255 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSREFS", 0, Lifetime::Timeframe);
1258 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKSMCLBL", 0, Lifetime::Timeframe);
1261 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1264 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1267 for (
auto const& sector : mTPCSectors) {
1268 mClusterOutputIds.emplace_back(sector);
1271 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1272 for (
const auto sector : mTPCSectors) {
1273 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVE", sector, Lifetime::Timeframe);
1276 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1280 for (
const auto sector : mTPCSectors) {
1281 outputSpecs.emplace_back(
gDataOriginTPC,
"CLNATIVEMCLBL", sector, Lifetime::Timeframe);
1284 outputSpecs.emplace_back(
gDataOriginTPC,
"CLNATIVEMCLBL", NSectors, Lifetime::Timeframe);
1289 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1290 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1293 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1296 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1299 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1303 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1304 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1305 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1306 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1307 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1308 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1311 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1312 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCPUR", 0, Lifetime::Timeframe);
1313 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1314 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1325 mDisplayFrontend.reset(
nullptr);
1326 mGPUReco.reset(
nullptr);
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Fetching neural networks for clusterization from CCDB.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
void initCcdbApi(std::string url)
void loadIndividualFromCCDB(std::map< std::string, std::string > settings)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"