83#include <TStopwatch.h>
88#include <TGraphAsymmErrors.h>
100#include <unordered_set>
112GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr)
115 throw std::runtime_error(
"inconsistent configuration: cluster output is only possible if CA clusterer is activated");
119 mConfParam.reset(
new GPUSettingsO2);
121 mTimer.reset(
new TStopwatch);
125 *gPolicyOrder = &mPolicyOrder;
112GPURecoWorkflowSpec::GPURecoWorkflowSpec(
GPURecoWorkflowSpec::CompletionPolicyData* policyData,
Config const& specconfig, std::vector<int32_t>
const& tpcsectors, uint64_t tpcSectorMask, std::shared_ptr<o2::base::GRPGeomRequest>& ggr, std::function<
bool(
o2::framework::DataProcessingHeader::StartTime)>** gPolicyOrder) :
o2::
framework::
Task(), mPolicyData(policyData), mTPCSectorMask(tpcSectorMask), mTPCSectors(tpcsectors), mSpecConfig(specconfig), mGGR(ggr) {
…}
137 mConfig->configGRP.solenoidBzNominalGPU = 0;
138 mTFSettings->hasSimStartOrbit = 1;
140 mTFSettings->simStartOrbit = hbfu.getFirstIRofTF(
o2::InteractionRecord(0, hbfu.orbitFirstSampled)).orbit;
142 *mConfParam = mConfig->ReadConfigurableParam();
143 if (mConfParam->display) {
145 mConfig->configProcessing.eventDisplay = mDisplayFrontend.get();
146 if (mConfig->configProcessing.eventDisplay !=
nullptr) {
147 LOG(info) <<
"Event display enabled";
149 throw std::runtime_error(
"GPU Event Display frontend could not be created!");
153 mConfig->configProcessing.doublePipeline = 1;
156 mAutoSolenoidBz = mConfParam->solenoidBzNominalGPU == -1e6f;
157 mAutoContinuousMaxTimeBin = mConfig->configGRP.grpContinuousMaxTimeBin < 0;
158 if (mAutoContinuousMaxTimeBin) {
161 if (mConfig->configProcessing.deviceNum == -2) {
164 mConfig->configProcessing.deviceNum = myId;
165 LOG(info) <<
"GPU device number selected from pipeline id: " << myId <<
" / " << idMax;
167 if (mConfig->configProcessing.debugLevel >= 3 && mVerbosity == 0) {
170 mConfig->configProcessing.runMC = mSpecConfig.
processMC;
172 if (!mSpecConfig.
processMC && !mConfig->configQA.clusterRejectionHistograms) {
173 throw std::runtime_error(
"Need MC information to create QA plots");
176 mConfig->configQA.noMC =
true;
178 mConfig->configQA.shipToQC =
true;
179 if (!mConfig->configProcessing.runQA) {
180 mConfig->configQA.enableLocalOutput =
false;
181 mQATaskMask = (mSpecConfig.
processMC ? 15 : 0) | (mConfig->configQA.clusterRejectionHistograms ? 32 : 0);
182 mConfig->configProcessing.runQA = -mQATaskMask;
185 mConfig->configReconstruction.tpc.nWaysOuter =
true;
186 mConfig->configInterface.outputToExternalBuffers =
true;
187 if (mConfParam->synchronousProcessing) {
188 mConfig->configReconstruction.useMatLUT =
false;
190 if (mConfig->configProcessing.rtc.optSpecialCode == -1) {
191 mConfig->configProcessing.rtc.optSpecialCode = mConfParam->synchronousProcessing;
200 mConfig->configWorkflow.steps.setBits(
GPUDataTypes::RecoStep::TPCdEdx, mConfParam->rundEdx == -1 ? !mConfParam->synchronousProcessing : mConfParam->rundEdx);
218 if (mTPCSectorMask != 0xFFFFFFFFF) {
219 throw std::invalid_argument(
"Cannot run TPC decompression with a sector mask");
232 mConfig->configProcessing.outputSharedClusterMap =
true;
235 mConfig->configProcessing.createO2Output = 0;
239 if (mConfParam->transformationFile.size() || mConfParam->transformationSCFile.size()) {
240 LOG(fatal) <<
"Deprecated configurable param options GPU_global.transformationFile or transformationSCFile used\n"
241 <<
"Instead, link the corresponding file as <somedir>/TPC/Calib/CorrectionMap/snapshot.root and use it via\n"
242 <<
"--condition-remap file://<somdir>=TPC/Calib/CorrectionMap option";
248 LOG(fatal) <<
"GPU two-threaded pipeline works only with TPC-only processing, and with ZS input";
252 mGPUReco = std::make_unique<GPUO2Interface>();
255 initFunctionTPCCalib(ic);
257 mConfig->configCalib.fastTransform = mCalibObjects.mFastTransformHelper->getCorrMap();
258 mConfig->configCalib.fastTransformRef = mCalibObjects.mFastTransformHelper->getCorrMapRef();
259 mConfig->configCalib.fastTransformMShape = mCalibObjects.mFastTransformHelper->getCorrMapMShape();
260 mConfig->configCalib.fastTransformHelper = mCalibObjects.mFastTransformHelper.get();
261 if (mConfig->configCalib.fastTransform ==
nullptr) {
262 throw std::invalid_argument(
"GPU workflow: initialization of the TPC transformation failed");
265 if (mConfParam->matLUTFile.size()) {
266 LOGP(info,
"Loading matlut file {}", mConfParam->matLUTFile.c_str());
268 if (mConfig->configCalib.matLUT ==
nullptr) {
269 LOGF(fatal,
"Error loading matlut file");
272 mConfig->configProcessing.lateO2MatLutProvisioningSize = 50 * 1024 * 1024;
276 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>();
277 mConfig->configCalib.trdGeometry = mTRDGeometry.get();
280 mConfig->configProcessing.willProvideO2PropagatorLate =
true;
281 mConfig->configProcessing.o2PropagatorUseGPUField =
true;
284 mConfig->configProcessing.printSettings =
true;
285 if (mConfParam->printSettings > 1) {
286 mConfig->PrintParam();
291 if (mGPUReco->Initialize(config) != 0) {
292 throw std::invalid_argument(
"GPU Reconstruction initialization failed");
295 mQA = std::make_unique<GPUO2InterfaceQA>(mConfig.get());
298 mGPUReco->setErrorCodeOutput(&mErrorQA);
309 if (mConfParam->dump >= 2) {
310 LOG(fatal) <<
"Cannot use dump-only mode with multi-threaded pipeline";
315 callbacks.
set<CallbackService::Id::RegionInfoCallback>([
this](fair::mq::RegionInfo
const& info) {
316 if (info.size == 0) {
320 mRegionInfos.emplace_back(info);
325 if (mConfParam->registerSelectedSegmentIds != -1 && info.managed && info.id != (uint32_t)mConfParam->registerSelectedSegmentIds) {
329 if (mConfParam->mutexMemReg) {
330 mode_t
mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
331 fd = open(
"/tmp/o2_gpu_memlock_mutex.lock", O_RDWR | O_CREAT | O_CLOEXEC,
mask);
333 throw std::runtime_error(
"Error opening memlock mutex lock file");
336 if (lockf(fd, F_LOCK, 0)) {
337 throw std::runtime_error(
"Error locking memlock mutex file");
340 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
341 if (mConfParam->benchmarkMemoryRegistration) {
342 start = std::chrono::high_resolution_clock::now();
344 if (mGPUReco->registerMemoryForGPU(info.ptr, info.size)) {
345 throw std::runtime_error(
"Error registering memory for GPU");
347 if (mConfParam->benchmarkMemoryRegistration) {
348 end = std::chrono::high_resolution_clock::now();
349 std::chrono::duration<double> elapsed_seconds =
end -
start;
350 LOG(info) <<
"Memory registration time (0x" << info.ptr <<
", " << info.size <<
" bytes): " << elapsed_seconds.count() <<
" s";
352 if (mConfParam->mutexMemReg) {
353 if (lockf(fd, F_ULOCK, 0)) {
354 throw std::runtime_error(
"Error unlocking memlock mutex file");
366 LOGF(info,
"GPU Reconstruction total timing: Cpu: %.3e Real: %.3e s in %d slots", mTimer->CpuTime(), mTimer->RealTime(), mTimer->Counter() - 1);
367 handlePipelineStop();
372 handlePipelineEndOfStream(ec);
378 finaliseCCDBTPC(matcher, obj);
380 finaliseCCDBITS(matcher, obj);
384 mGRPGeomUpdated =
true;
389template <
class D,
class E,
class F,
class G,
class H,
class I,
class J,
class K>
390void GPURecoWorkflowSpec::processInputs(
ProcessingContext& pc, D& tpcZSmeta, E& inputZS, F& tpcZS, G& tpcZSonTheFlySizes,
bool& debugTFDump, H& compClustersDummy, I& compClustersFlatDummy, J& pCompClustersFlat, K& tmpEmptyCompClusters)
401 tpcZSmeta.Pointers[
i][
j].clear();
402 tpcZSmeta.Sizes[
i][
j].clear();
407 tpcZSonTheFlySizes = {0};
410 bool recv =
false, recvsizes =
false;
413 throw std::runtime_error(
"Received multiple ZSSIZES data");
415 tpcZSonTheFlySizes = pc.
inputs().
get<std::array<uint32_t, NEndpoints * NSectors>>(
ref);
422 throw std::runtime_error(
"Received multiple TPCZS data");
424 inputZS = pc.
inputs().
get<gsl::span<o2::tpc::ZeroSuppressedContainer8kb>>(
ref);
427 if (!recv || !recvsizes) {
428 throw std::runtime_error(
"TPC ZS on the fly data not received");
433 uint32_t pageSector = 0;
434 for (uint32_t
j = 0;
j < NEndpoints;
j++) {
435 pageSector += tpcZSonTheFlySizes[
i * NEndpoints +
j];
436 offset += tpcZSonTheFlySizes[
i * NEndpoints +
j];
438 if (mVerbosity >= 1) {
439 LOG(info) <<
"GOT ZS on the fly pages FOR SECTOR " <<
i <<
" -> pages: " << pageSector;
445 auto isSameRdh = [](
const char*
left,
const char*
right) ->
bool {
446 return o2::raw::RDHUtils::getFEEID(
left) == o2::raw::RDHUtils::getFEEID(
right) && o2::raw::RDHUtils::getDetectorField(
left) == o2::raw::RDHUtils::getDetectorField(
right);
448 auto checkForZSData = [](
const char*
ptr, uint32_t subSpec) ->
bool {
449 const auto rdhLink = o2::raw::RDHUtils::getLinkID(
ptr);
450 const auto detField = o2::raw::RDHUtils::getDetectorField(
ptr);
451 const auto feeID = o2::raw::RDHUtils::getFEEID(
ptr);
452 const auto feeLinkID = o2::tpc::rdh_utils::getLink(feeID);
454 return detField ==
o2::tpc::raw_data_types::ZS && ((feeLinkID == o2::tpc::rdh_utils::UserLogicLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == 0)) ||
455 (feeLinkID == o2::tpc::rdh_utils::ILBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::ILBZSLinkID || rdhLink == 0)) ||
456 (feeLinkID == o2::tpc::rdh_utils::DLBZSLinkID && (rdhLink == o2::tpc::rdh_utils::UserLogicLinkID || rdhLink == o2::tpc::rdh_utils::DLBZSLinkID || rdhLink == 0)));
458 auto insertPages = [&tpcZSmeta, checkForZSData](
const char*
ptr,
size_t count, uint32_t subSpec) ->
void {
459 if (checkForZSData(
ptr, subSpec)) {
460 int32_t rawcru = o2::tpc::rdh_utils::getCRU(
ptr);
461 int32_t rawendpoint = o2::tpc::rdh_utils::getEndPoint(
ptr);
462 tpcZSmeta.Pointers[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
ptr);
463 tpcZSmeta.Sizes[rawcru / 10][(rawcru % 10) * 2 + rawendpoint].emplace_back(
count);
468 static uint32_t nErrors = 0;
470 if (nErrors == 1 || (nErrors < 100 && nErrors % 10 == 0) || nErrors % 1000 == 0 || mNTFs % 1000 == 0) {
471 LOG(error) <<
"DPLRawPageSequencer failed to process TPC raw data - data most likely not padded correctly - Using slow page scan instead (this alarm is downscaled from now on, so far " << nErrors <<
" of " << mNTFs <<
" TFs affected)";
475 int32_t totalCount = 0;
478 tpcZSmeta.Pointers2[
i][
j] = tpcZSmeta.Pointers[
i][
j].data();
479 tpcZSmeta.Sizes2[
i][
j] = tpcZSmeta.Sizes[
i][
j].data();
480 tpcZS.sector[
i].zsPtr[
j] = tpcZSmeta.Pointers2[
i][
j];
481 tpcZS.sector[
i].nZSPtr[
j] = tpcZSmeta.Sizes2[
i][
j];
482 tpcZS.sector[
i].count[
j] = tpcZSmeta.Pointers[
i][
j].size();
483 totalCount += tpcZSmeta.Pointers[
i][
j].size();
489 compClustersFlatDummy.setForward(&compClustersDummy);
490 pCompClustersFlat = &compClustersFlatDummy;
494 if (pCompClustersFlat ==
nullptr) {
501 LOGF(info,
"running tracking for sector(s) 0x%09x", mTPCSectorMask);
509 if (mConfParam->dump < 2) {
510 retVal = mGPUReco->RunTracking(ptrs, outputRegions, threadIndex, inputUpdateCallback);
513 retVal = runITSTracking(*pc);
518 mGPUReco->Clear(
false, threadIndex);
523void GPURecoWorkflowSpec::cleanOldCalibsTPCPtrs(calibObjectStruct& oldCalibObjects)
525 if (mOldCalibObjects.size() > 0) {
526 mOldCalibObjects.pop();
528 mOldCalibObjects.emplace(std::move(oldCalibObjects));
536 auto cput = mTimer->CpuTime();
537 auto realt = mTimer->RealTime();
538 mTimer->Start(
false);
541 std::vector<gsl::span<const char>>
inputs;
549 std::array<uint32_t, NEndpoints * NSectors> tpcZSonTheFlySizes;
550 gsl::span<const o2::tpc::ZeroSuppressedContainer8kb> inputZS;
551 std::unique_ptr<char[]> tmpEmptyCompClusters;
553 bool getWorkflowTPCInput_clusters =
false, getWorkflowTPCInput_mc =
false, getWorkflowTPCInput_digits =
false;
554 bool debugTFDump =
false;
557 getWorkflowTPCInput_mc =
true;
560 getWorkflowTPCInput_clusters =
true;
563 getWorkflowTPCInput_digits =
true;
568 auto lockDecodeInput = std::make_unique<std::lock_guard<std::mutex>>(mPipeline->mutexDecodeInput);
575 LOG(fatal) <<
"configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
579 processInputs(pc, tpcZSmeta, inputZS, tpcZS, tpcZSonTheFlySizes, debugTFDump, compClustersDummy, compClustersFlatDummy, pCompClustersFlat, tmpEmptyCompClusters);
580 const auto& inputsClustersDigits = o2::tpc::getWorkflowTPCInput(pc, mVerbosity, getWorkflowTPCInput_mc, getWorkflowTPCInput_clusters, mTPCSectorMask, getWorkflowTPCInput_digits);
583 mTFSettings->tfStartOrbit = tinfo.firstTForbit;
584 mTFSettings->hasTfStartOrbit = 1;
585 mTFSettings->hasNHBFPerTF = 1;
587 mTFSettings->hasRunStartOrbit = 0;
589 LOG(info) <<
"TF firstTForbit " << mTFSettings->tfStartOrbit <<
" nHBF " << mTFSettings->nHBFPerTF <<
" runStartOrbit " << mTFSettings->runStartOrbit <<
" simStartOrbit " << mTFSettings->simStartOrbit;
593 if (mConfParam->checkFirstTfOrbit) {
594 static uint32_t lastFirstTFOrbit = -1;
595 static uint32_t lastTFCounter = -1;
596 if (lastFirstTFOrbit != -1 && lastTFCounter != -1) {
597 int32_t diffOrbit = tinfo.firstTForbit - lastFirstTFOrbit;
598 int32_t diffCounter = tinfo.tfCounter - lastTFCounter;
599 if (diffOrbit != diffCounter * mTFSettings->nHBFPerTF) {
600 LOG(error) <<
"Time frame has mismatching firstTfOrbit - Last orbit/counter: " << lastFirstTFOrbit <<
" " << lastTFCounter <<
" - Current: " << tinfo.firstTForbit <<
" " << tinfo.tfCounter;
603 lastFirstTFOrbit = tinfo.firstTForbit;
604 lastTFCounter = tinfo.tfCounter;
616 void* ptrEp[NSectors * NEndpoints] = {};
617 bool doInputDigits =
false, doInputDigitsMC =
false;
621 const uint64_t*
buffer =
reinterpret_cast<const uint64_t*
>(&inputZS[0]);
624 doInputDigits = doInputDigitsMC = mSpecConfig.
processMC;
628 throw std::runtime_error(
"Cannot process MC information, none available");
631 doInputDigits =
true;
637 if (mTPCSectorMask != 0xFFFFFFFFF) {
639 for (uint32_t
i = 0;
i < NSectors;
i++) {
640 if (!(mTPCSectorMask & (1ul <<
i))) {
656 if (doInputDigitsMC) {
659 for (uint32_t
i = 0;
i < NSectors;
i++) {
660 tpcDigitsMap.
tpcDigits[
i] = inputsClustersDigits->inputDigits[
i].data();
661 tpcDigitsMap.
nTPCDigits[
i] = inputsClustersDigits->inputDigits[
i].size();
662 if (doInputDigitsMC) {
663 tpcDigitsMapMC.
v[
i] = inputsClustersDigits->inputDigitsMCPtrs[
i];
669 if (mClusterOutputIds.size() > 0) {
670 clusterOutputSectorHeader.
sectorBits = mTPCSectorMask;
672 clusterOutputSectorHeader.activeSectors = mTPCSectorMask;
677 std::unique_ptr<GPURecoWorkflow_QueueObject> pipelineContext;
679 if (handlePipeline(pc, ptrs, tpcZSmeta, tpcZS, pipelineContext)) {
687 using outputDataType =
char;
689 using outputBufferType = std::pair<std::optional<std::reference_wrapper<outputBufferUninitializedVector>>, outputDataType*>;
691 std::unordered_set<std::string> outputsCreated;
693 auto setOutputAllocator = [
this, &outputBuffers, &outputRegions, &pc, &outputsCreated](
const char*
name,
bool condition,
GPUOutputControl& region,
auto&& outputSpec,
size_t offset = 0) {
696 if (mConfParam->allocateOutputOnTheFly) {
697 region.allocator = [
this,
name, &
buffer, &pc, outputSpec = std::move(outputSpec),
offset, &outputsCreated](
size_t size) ->
void* {
700 LOG(info) <<
"ALLOCATING " <<
size <<
" bytes for " <<
name <<
": " << std::get<DataOrigin>(outputSpec).template as<std::string>() <<
"/" << std::get<DataDescription>(outputSpec).template as<std::string>() <<
"/" << std::get<2>(outputSpec);
702 std::chrono::time_point<std::chrono::high_resolution_clock>
start,
end;
704 start = std::chrono::high_resolution_clock::now();
707 outputsCreated.insert(
name);
709 end = std::chrono::high_resolution_clock::now();
710 std::chrono::duration<double> elapsed_seconds =
end -
start;
711 LOG(info) <<
"Allocation time for " <<
name <<
" (" <<
size <<
" bytes)"
712 <<
": " << elapsed_seconds.count() <<
"s";
720 outputsCreated.insert(
name);
725 auto downSizeBuffer = [](outputBufferType&
buffer,
size_t size) {
730 throw std::runtime_error(
"Invalid buffer size requested");
734 throw std::runtime_error(
"Inconsistent buffer address after downsize");
743 auto downSizeBufferToSpan = [&outputBuffers, &outputRegions, &downSizeBuffer](
GPUOutputControl& region,
auto span) {
748 if (span.size() &&
buffer.second != (
char*)span.data()) {
749 throw std::runtime_error(
"Buffer does not match span");
751 downSizeBuffer(
buffer, span.size() *
sizeof(*span.data()));
770 throw std::runtime_error(
"Invalid input for gpu tracking");
775 calibObjectStruct oldCalibObjects;
776 doCalibUpdates(pc, oldCalibObjects);
778 lockDecodeInput.reset();
780 if (mConfParam->dump) {
782 mGPUReco->DumpSettings();
784 mGPUReco->DumpEvent(mNTFs - 1, &ptrs);
786 std::unique_ptr<GPUTrackingInOutPointers> ptrsDump;
787 if (mConfParam->dumpBadTFMode == 2) {
789 memcpy((
void*)ptrsDump.get(), (
const void*)&ptrs,
sizeof(ptrs));
794 if (!pipelineContext->jobSubmitted) {
795 enqueuePipelinedJob(&ptrs, &outputRegions, pipelineContext.get(),
true);
797 finalizeInputPipelinedJob(&ptrs, &outputRegions, pipelineContext.get());
799 std::unique_lock lk(pipelineContext->jobFinishedMutex);
800 pipelineContext->jobFinishedNotify.wait(lk, [context = pipelineContext.get()]() { return context->jobFinished; });
801 retVal = pipelineContext->jobReturnValue;
804 uint32_t threadIndex = mNextThreadIndex;
805 if (mConfig->configProcessing.doublePipeline) {
806 mNextThreadIndex = (mNextThreadIndex + 1) % 2;
809 retVal = runMain(&pc, &ptrs, &outputRegions, threadIndex);
814 cleanOldCalibsTPCPtrs(oldCalibObjects);
816 o2::utils::DebugStreamer::instance()->flush();
818 if (debugTFDump && mNDebugDumps < mConfParam->dumpBadTFs) {
820 if (mConfParam->dumpBadTFMode <= 1) {
822 FILE* fp = fopen(
filename.c_str(),
"w+b");
826 if (mConfParam->dumpBadTFMode == 1) {
830 fwrite(
data.data(), 1,
data.size(), fp);
833 }
else if (mConfParam->dumpBadTFMode == 2) {
834 mGPUReco->DumpEvent(mNDebugDumps - 1, ptrsDump.get());
838 if (mConfParam->dump == 2) {
844 bool createEmptyOutput =
false;
846 if (
retVal == 3 && mConfig->configProcessing.ignoreNonFatalGPUErrors) {
847 if (mConfig->configProcessing.throttleAlarms) {
848 LOG(warning) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
850 LOG(alarm) <<
"GPU Reconstruction aborted with non fatal error code, ignoring";
852 createEmptyOutput = !mConfParam->partialOutputForNonFatalErrors;
858 std::unique_ptr<o2::tpc::ClusterNativeAccess> tmpEmptyClNative;
859 if (createEmptyOutput) {
860 memset(&ptrs, 0,
sizeof(ptrs));
861 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
862 if (outputBuffers[
i].
first) {
869 outputBuffers[
i].first->get().resize(toSize);
870 outputBuffers[
i].second = outputBuffers[
i].first->get().data();
872 memset(outputBuffers[
i].second, 0, toSize);
876 tmpEmptyClNative = std::make_unique<o2::tpc::ClusterNativeAccess>();
877 memset(tmpEmptyClNative.get(), 0,
sizeof(*tmpEmptyClNative));
882 clustersMCBuffer.second = clustersMCBuffer.first;
883 tmpEmptyClNative->clustersMCTruth = &clustersMCBuffer.second;
889 if (!mConfParam->allocateOutputOnTheFly) {
890 for (uint32_t
i = 0;
i < outputRegions.
count();
i++) {
893 throw std::runtime_error(
"Preallocated buffer size exceeded");
896 downSizeBuffer(outputBuffers[
i], (
char*)outputRegions.
asArray()[
i].
ptrCurrent - (
char*)outputBuffers[
i].second);
900 downSizeBufferToSpan(outputRegions.
tpcTracksO2, spanOutputTracks);
906 doTrackTuneTPC(ptrs, outputBuffers[outputRegions.
getIndex(outputRegions.
tpcTracksO2)].first->get().data());
910 throw std::runtime_error(
"cluster native output ptrs out of sync");
923 if (mClusterOutputIds.size() > 0) {
927 for (uint32_t
i = 0;
i < NSectors;
i++) {
928 if (mTPCSectorMask & (1ul <<
i)) {
930 clusterOutputSectorHeader.sectorBits = (1ul <<
i);
933 memset(outIndex, 0,
sizeof(*outIndex));
942 for (
const auto&
label : labels) {
966 auto getoutput = [sendQAOutput](
auto ptr) {
return sendQAOutput &&
ptr ? *
ptr : std::decay_t<decltype(*ptr)>(); };
967 std::vector<TH1F> copy1 = getoutput(outputRegions.
qa.
hist1);
968 std::vector<TH2F> copy2 = getoutput(outputRegions.
qa.
hist2);
969 std::vector<TH1D> copy3 = getoutput(outputRegions.
qa.
hist3);
970 std::vector<TGraphAsymmErrors> copy4 = getoutput(outputRegions.
qa.
hist4);
972 mQA->postprocessExternal(copy1, copy2, copy3, copy4, out, mQATaskMask ? mQATaskMask : -1);
990 LOG(info) <<
"GPU Reconstruction time for this TF " << mTimer->CpuTime() - cput <<
" s (cpu), " << mTimer->RealTime() - realt <<
" s (wall)";
998 bool needCalibUpdate =
false;
999 if (mGRPGeomUpdated) {
1000 mGRPGeomUpdated =
false;
1001 needCalibUpdate =
true;
1003 if (!mITSGeometryCreated) {
1006 mITSGeometryCreated =
true;
1009 if (mAutoSolenoidBz) {
1015 if (mAutoContinuousMaxTimeBin) {
1018 LOG(info) <<
"Updating max time bin " << newCalibValues.
continuousMaxTimeBin <<
" (" << mTFSettings->nHBFPerTF <<
" orbits)";
1021 if (!mPropagatorInstanceCreated) {
1023 if (mConfig->configProcessing.o2PropagatorUseGPUField) {
1026 mPropagatorInstanceCreated =
true;
1029 if (!mMatLUTCreated) {
1030 if (mConfParam->matLUTFile.size() == 0) {
1032 LOG(info) <<
"Loaded material budget lookup table";
1034 mMatLUTCreated =
true;
1036 if (!mTRDGeometryCreated) {
1039 gm->createPadPlaneArray();
1040 gm->createClusterMatrixArray();
1041 mTRDGeometry = std::make_unique<o2::trd::GeometryFlat>(*gm);
1042 newCalibObjects.
trdGeometry = mConfig->configCalib.trdGeometry = mTRDGeometry.get();
1043 LOG(info) <<
"Loaded TRD geometry";
1045 mTRDGeometryCreated =
true;
1048 needCalibUpdate = fetchCalibsCCDBTPC(pc, newCalibObjects, oldCalibObjects) || needCalibUpdate;
1050 needCalibUpdate = fetchCalibsCCDBITS(pc) || needCalibUpdate;
1052 if (mTPCCutAtTimeBin != mConfig->configGRP.tpcCutTimeBin) {
1054 newCalibValues.
tpcTimeBinCut = mConfig->configGRP.tpcCutTimeBin = mTPCCutAtTimeBin;
1055 needCalibUpdate =
true;
1057 if (needCalibUpdate) {
1058 LOG(info) <<
"Updating GPUReconstruction calibration objects";
1059 mGPUReco->UpdateCalibration(newCalibObjects, newCalibValues);
1068 char* o2jobid = getenv(
"O2JOBID");
1069 char* numaid = getenv(
"NUMAID");
1070 int32_t chanid = o2jobid ? atoi(o2jobid) : (numaid ? atoi(numaid) : 0);
1071 std::string chan = std::string(
"name=gpu-prepare-channel,type=") + (send ?
"push" :
"pull") +
",method=" + (send ?
"connect" :
"bind") +
",address=ipc://@gpu-prepare-channel-" +
std::to_string(chanid) +
"-{timeslice0},transport=shmem,rateLogging=0";
1091 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
1096 LOG(fatal) <<
"Double pipeline mode can only work with zsraw input";
1100 inputs.emplace_back(
"pipelineprepare",
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1110 if (mapSources != 0) {
1131 mCalibObjects.mFastTransformHelper->requestCCDBInputs(
inputs, optsDummy, gloOpts);
1169 inputs.emplace_back(
"compClusters",
"ITS",
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1170 inputs.emplace_back(
"patterns",
"ITS",
"PATTERNS", 0, Lifetime::Timeframe);
1171 inputs.emplace_back(
"ROframes",
"ITS",
"CLUSTERSROF", 0, Lifetime::Timeframe);
1173 inputs.emplace_back(
"phystrig",
"ITS",
"PHYSTRIG", 0, Lifetime::Timeframe);
1175 inputs.emplace_back(
"phystrig",
"TRD",
"TRKTRGRD", 0, Lifetime::Timeframe);
1177 inputs.emplace_back(
"itscldict",
"ITS",
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Calib/ClusterDictionary"));
1178 inputs.emplace_back(
"itsalppar",
"ITS",
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(
"ITS/Config/AlpideParam"));
1181 inputs.emplace_back(
"meanvtx",
"GLO",
"MEANVERTEX", 0, Lifetime::Condition,
ccdbParamSpec(
"GLO/Calib/MeanVertex", {}, 1));
1184 inputs.emplace_back(
"itsmclabels",
"ITS",
"CLUSTERSMCTR", 0, Lifetime::Timeframe);
1185 inputs.emplace_back(
"ITSMC2ROframes",
"ITS",
"CLUSTERSMC2ROF", 0, Lifetime::Timeframe);
1195 std::vector<OutputSpec> outputSpecs;
1197 outputSpecs.emplace_back(
gDataOriginGPU,
"PIPELINEPREPARE", 0, Lifetime::Timeframe);
1201 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKS", 0, Lifetime::Timeframe);
1202 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSREFS", 0, Lifetime::Timeframe);
1205 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKSMCLBL", 0, Lifetime::Timeframe);
1208 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
1211 outputSpecs.emplace_back(
gDataOriginTPC,
"COMPCLUSTERSFLAT", 0, Lifetime::Timeframe);
1214 for (
auto const& sector : mTPCSectors) {
1215 mClusterOutputIds.emplace_back(sector);
1218 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVETMP", NSectors, Lifetime::Timeframe);
1219 for (
const auto sector : mTPCSectors) {
1220 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVE", sector, Lifetime::Timeframe);
1223 outputSpecs.emplace_back(
gDataOriginTPC,
"CLUSTERNATIVE", NSectors, Lifetime::Timeframe);
1227 for (
const auto sector : mTPCSectors) {
1228 outputSpecs.emplace_back(
gDataOriginTPC,
"CLNATIVEMCLBL", sector, Lifetime::Timeframe);
1231 outputSpecs.emplace_back(
gDataOriginTPC,
"CLNATIVEMCLBL", NSectors, Lifetime::Timeframe);
1236 outputSpecs.emplace_back(
gDataOriginTPC,
"CLSHAREDMAP", 0, Lifetime::Timeframe);
1237 outputSpecs.emplace_back(
gDataOriginTPC,
"TPCOCCUPANCYMAP", 0, Lifetime::Timeframe);
1240 outputSpecs.emplace_back(
gDataOriginTPC,
"TRIGGERWORDS", 0, Lifetime::Timeframe);
1243 outputSpecs.emplace_back(
gDataOriginTPC,
"TRACKINGQA", 0, Lifetime::Timeframe);
1246 outputSpecs.emplace_back(
gDataOriginGPU,
"ERRORQA", 0, Lifetime::Timeframe);
1250 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKS", 0, Lifetime::Timeframe);
1251 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKCLSID", 0, Lifetime::Timeframe);
1252 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackROF", 0, Lifetime::Timeframe);
1253 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICES", 0, Lifetime::Timeframe);
1254 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESROF", 0, Lifetime::Timeframe);
1255 outputSpecs.emplace_back(
gDataOriginITS,
"IRFRAMES", 0, Lifetime::Timeframe);
1258 outputSpecs.emplace_back(
gDataOriginITS,
"VERTICESMCTR", 0, Lifetime::Timeframe);
1259 outputSpecs.emplace_back(
gDataOriginITS,
"TRACKSMCTR", 0, Lifetime::Timeframe);
1260 outputSpecs.emplace_back(
gDataOriginITS,
"ITSTrackMC2ROF", 0, Lifetime::Timeframe);
1271 mDisplayFrontend.reset(
nullptr);
1272 mGPUReco.reset(
nullptr);
Simple interface to the CDB manager.
Definition of container class for dE/dx corrections.
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
A const (ready only) version of MCTruthContainer.
Helper class to access correction maps.
Helper class to access load maps from CCDB.
A parser and sequencer utility for raw pages within DPL input.
A raw page parser for DPL input.
Definition of class for writing debug informations.
Definition of the GeometryManager class.
Helper for geometry and GRP related CCDB requests.
Definition of the GeometryTGeo class.
Declarations for the wrapper for the set of cylindrical material layers.
Definition of the Names Generator class.
Utilities for parsing of data sequences.
Type wrappers for enfording a specific serialization method.
Wrapper class for TPC CA Tracker algorithm.
Configurable params for tracks ad hoc tuning.
Helper class to extract VDrift from different sources.
Helper class to obtain TPC clusters / digits / labels from DPL.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static MatLayerCylSet * loadFromFile(const std::string &inpFName="matbud.root")
GPUd() value_type estimateLTFast(o2 static GPUd() float estimateLTIncrement(const o2 PropagatorImpl * Instance(bool uninitialized=false)
static const HBFUtils & Instance()
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static GPUDisplayFrontendInterface * getFrontend(const char *type)
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
o2::framework::Outputs outputs()
std::vector< framework::InputSpec > CompletionPolicyData
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
o2::framework::Inputs inputs()
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
~GPURecoWorkflowSpec() override
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
GPURecoWorkflowSpec(CompletionPolicyData *policyData, Config const &specconfig, std::vector< int32_t > const &tpcsectors, uint64_t tpcSectorMask, std::shared_ptr< o2::base::GRPGeomRequest > &ggr, std::function< bool(o2::framework::DataProcessingHeader::StartTime)> **gPolicyOrder=nullptr)
o2::framework::Options options()
static void RunZSEncoderCreateMeta(const uint64_t *buffer, const uint32_t *sizes, void **ptrs, GPUTrackingInOutZS *out)
static GeometryTGeo * Instance()
void fillMatrixCache(int mask) override
ClusterNativeAccess::ConstMCLabelContainerViewWithBuffer ConstMCLabelContainerViewWithBuffer
static void addOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static constexpr int MAXSECTOR
static precheckModifiedData runPrecheck(o2::gpu::GPUTrackingInOutPointers *ptrs, o2::gpu::GPUO2InterfaceConfiguration *config)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
static Geometry * instance()
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr o2::header::DataOrigin gDataOriginTRD
constexpr o2::header::DataOrigin gDataOriginITS
constexpr o2::header::DataOrigin gDataOriginGPU
Defining PrimaryVertex explicitly as messageable.
o2::header::DataDescription DataDescription
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
constexpr int MAXGLOBALPADROW
@ ZS
final Zero Suppression (can be ILBZS, DLBZS)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
@ FEEConfig
use fee config
@ IDCPadStatus
use idc pad status map
@ CalIDCPadStatusMapA
Status map of the pads (dead etc. obatined from CalIDC0)
@ CalPadGainFull
Full pad gain calibration.
@ CalPadGainResidual
ResidualpPad gain calibration (e.g. from tracks)
@ CalTimeGain
Gain variation over time.
@ CalTimeGainMC
Gain variation over time for MC.
@ AltroSyncSignal
timing of Altro chip sync. signal
auto getRecoInputContainer(o2::framework::ProcessingContext &pc, o2::gpu::GPUTrackingInOutPointers *ptrs, const o2::globaltracking::RecoContainer *inputTracks, bool mc=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
size_t inputTimesliceId
The time pipelining id of this particular device.
void requestTracks(o2::dataformats::GlobalTrackID::mask_t src, bool mc)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
S< o2::trd::GeometryFlat >::type * trdGeometry
S< o2::base::PropagatorImpl< float > >::type * o2Propagator
S< o2::base::MatLayerCylSet >::type * matLUT
const std::vector< TH1F > * hist1
const std::vector< TGraphAsymmErrors > * hist4
const std::vector< TH1D > * hist3
const std::vector< TH2F > * hist2
bool newContinuousMaxTimeBin
uint32_t continuousMaxTimeBin
GPUSettingsProcessing configProcessing
std::function< void *(size_t)> allocator
int32_t tpcDeadMapSources
bool decompressTPCFromROOT
bool sendClustersPerSector
int32_t enableDoublePipeline
bool outputSharedClusterMap
bool outputCompClustersFlat
const o2::tpc::Digit * tpcDigits[NSECTORS]
size_t nTPCDigits[NSECTORS]
const GPUTPCDigitsMCInput * tpcDigitsMC
const o2::tpc::ClusterNativeAccess * clustersNative
const o2::tpc::CompressedClustersFlat * tpcCompressedClusters
const uint32_t * outputClusRefsTPCO2
const GPUSettingsTF * settingsTF
const GPUTrackingInOutZS * tpcZS
const o2::MCCompLabel * outputTracksTPCO2MC
uint32_t nOutputTracksTPCO2
uint32_t nOutputClusRefsTPCO2
const o2::tpc::TrackTPC * outputTracksTPCO2
const GPUTrackingInOutDigits * tpcPackedDigits
const void *const * zsPtr[NENDPOINTS]
uint32_t count[NENDPOINTS]
const uint32_t * nZSPtr[NENDPOINTS]
GPUTrackingInOutZSSector sector[NSECTORS]
static constexpr uint32_t NSECTORS
static constexpr uint32_t NENDPOINTS
GPUOutputControl tpcTracksO2
GPUOutputControl clustersNative
GPUOutputControl tpcOccupancyMap
GPUOutputControl * asArray()
GPUOutputControl tpcTracksO2Labels
GPUOutputControl tpcTracksO2ClusRefs
size_t getIndex(const GPUOutputControl &v)
static constexpr size_t count()
GPUOutputControl sharedClusterMap
GPUOutputControl compressedClusters
GPUOutputControl clusterLabels
GPUOutputControl tpcTriggerWords
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int nClustersSector[constants::MAXSECTOR]
const o2::dataformats::ConstMCTruthContainerView< o2::MCCompLabel > * clustersMCTruth
const ClusterNative * clusters[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
unsigned int clusterOffset[constants::MAXSECTOR][constants::MAXGLOBALPADROW]
const ClusterNative * clustersLinear
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"