Project
Loading...
Searching...
No Matches
EntropyEncoderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Headers/DataHeader.h"
25#include "GPUO2InterfaceUtils.h"
26#include "GPUParam.h"
28#include "TPCClusterDecompressionCore.inc"
29#include "GPUTPCCompressionKernels.inc"
32
33using namespace o2::framework;
34using namespace o2::header;
35using namespace o2::base;
36
37namespace o2
38{
39namespace tpc
40{
42
43EntropyEncoderSpec::EntropyEncoderSpec(bool fromFile, bool selIR, std::shared_ptr<o2::base::GRPGeomRequest> pgg, const std::string& ctfdictOpt) : mCTFCoder(o2::ctf::CTFCoderBase::OpType::Encoder, ctfdictOpt), mFromFile(fromFile), mSelIR(selIR)
44{
45 if (mSelIR) {
46 mGRPRequest = pgg;
48 }
49 mTimer.Stop();
50 mTimer.Reset();
51}
52
54{
55 if (mCTFCoder.finaliseCCDB<CTF>(matcher, obj)) {
56 return;
57 }
58 if (mSelIR && mTPCVDriftHelper->accountCCDBInputs(matcher, obj)) {
59 return;
60 }
61 if (mSelIR && GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
62 return;
63 }
64}
65
67{
68 mCTFCoder.init<CTF>(ic);
69 mCTFCoder.setCombineColumns(!ic.options().get<bool>("no-ctf-columns-combining"));
70
71 mFastTransform = std::move(TPCFastTransformHelperO2::instance()->create(0));
72
73 mParam = GPUO2InterfaceUtils::getFullParam(0.f, 0, &mConfig, &mConfParam, &mAutoContinuousMaxTimeBin);
74
75 if (mSelIR) {
76 mTPCVDriftHelper.reset(new VDriftHelper);
77 }
78
79 mNThreads = ic.options().get<unsigned int>("nThreads-tpc-encoder");
80 mMaxZ = ic.options().get<float>("irframe-clusters-maxz");
81 mMaxEta = ic.options().get<float>("irframe-clusters-maxeta");
82
83 mEtaFactor = 1.f / (tanf(2 * atanf(expf(-mMaxEta))));
84}
85
87{
88 if (mSelIR) {
90 if (GRPGeomHelper::instance().getGRPECS()->isDetReadOut(o2::detectors::DetID::TPC) && mConfParam->tpcTriggeredMode ^ !GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(o2::detectors::DetID::TPC)) {
91 LOG(fatal) << "configKeyValue tpcTriggeredMode does not match GRP isDetContinuousReadOut(TPC) setting";
92 }
93
94 mConfig->configGRP.grpContinuousMaxTimeBin = GPUO2InterfaceUtils::getTpcMaxTimeBinFromNHbf(GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF());
95 mConfig->configGRP.solenoidBzNominalGPU = GPUO2InterfaceUtils::getNominalGPUBz(*GRPGeomHelper::instance().getGRPMagField());
96 mParam->UpdateSettings(&mConfig->configGRP);
97
98 mTPCVDriftHelper->extractCCDBInputs(pc);
99 if (mTPCVDriftHelper->isUpdated()) {
100 TPCFastTransformHelperO2::instance()->updateCalibration(*mFastTransform, 0, mTPCVDriftHelper->getVDriftObject().corrFact, mTPCVDriftHelper->getVDriftObject().refVDrift, mTPCVDriftHelper->getVDriftObject().getTimeOffset());
101 }
102 }
103
104 mCTFCoder.updateTimeDependentParams(pc, true);
105
107 static o2::tpc::detail::TriggerInfo trigComp;
108
109 if (mFromFile) {
110 auto tmp = pc.inputs().get<CompressedClustersROOT*>("input");
111 if (tmp == nullptr) {
112 LOG(error) << "invalid input";
113 return;
114 }
115 clusters = *tmp;
116 } else {
117 auto tmp = pc.inputs().get<CompressedClustersFlat*>("input");
118 if (tmp == nullptr) {
119 LOG(error) << "invalid input";
120 return;
121 }
122 clusters = *tmp;
123 }
124 auto triggers = pc.inputs().get<gsl::span<o2::tpc::TriggerInfoDLBZS>>("trigger");
125 auto cput = mTimer.CpuTime();
126 mTimer.Start(false);
127 auto& buffer = pc.outputs().make<std::vector<o2::ctf::BufferType>>(Output{"TPC", "CTFDATA", 0});
128 std::vector<bool> rejectHits, rejectTracks, rejectTrackHits, rejectTrackHitsReduced;
129 CompressedClusters clustersFiltered = clusters;
130 std::vector<std::pair<std::vector<unsigned int>, std::vector<unsigned short>>> tmpBuffer(std::max<int>(mNThreads, 1));
131
132 // prepare trigger info
133 trigComp.clear();
134 for (const auto& trig : triggers) {
135 for (int it = 0; it < o2::tpc::TriggerWordDLBZS::MaxTriggerEntries; it++) {
136 if (trig.triggerWord.isValid(it)) {
137 trigComp.deltaOrbit.push_back(trig.orbit);
138 trigComp.deltaBC.push_back(trig.triggerWord.getTriggerBC(it));
139 trigComp.triggerType.push_back(trig.triggerWord.getTriggerType(it));
140 } else {
141 break;
142 }
143 }
144 }
145
146 if (mSelIR) {
147 if (clusters.nTracks && clusters.solenoidBz != -1e6f && clusters.solenoidBz != mParam->bzkG) {
148 throw std::runtime_error("Configured solenoid Bz does not match value used for track model encoding");
149 }
150 if (clusters.nTracks && clusters.maxTimeBin != -1e6 && clusters.maxTimeBin != mParam->continuousMaxTimeBin) {
151 throw std::runtime_error("Configured max time bin does not match value used for track model encoding");
152 }
153 mCTFCoder.setSelectedIRFrames(pc.inputs().get<gsl::span<o2::dataformats::IRFrame>>("selIRFrames"));
154 rejectHits.resize(clusters.nUnattachedClusters);
155 rejectTracks.resize(clusters.nTracks);
156 rejectTrackHits.resize(clusters.nAttachedClusters);
157 rejectTrackHitsReduced.resize(clusters.nAttachedClustersReduced);
158
159 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
160 const auto firstIR = o2::InteractionRecord(0, tinfo.firstTForbit);
161 const float totalT = std::max(mFastTransform->getMaxDriftTime(0), mFastTransform->getMaxDriftTime(GPUCA_NSECTORS / 2));
162
163 unsigned int offset = 0, lasti = 0;
164 const unsigned int maxTime = (mParam->continuousMaxTimeBin + 1) * o2::tpc::ClusterNative::scaleTimePacked - 1;
165#ifdef WITH_OPENMP
166#pragma omp parallel for firstprivate(offset, lasti) num_threads(mNThreads)
167#endif
168 for (unsigned int i = 0; i < clusters.nTracks; i++) {
169 unsigned int tMinP = maxTime, tMaxP = 0;
170 auto checker = [&tMinP, &tMaxP](const o2::tpc::ClusterNative& cl, unsigned int offset) {
171 if (cl.getTimePacked() > tMaxP) {
172 tMaxP = cl.getTimePacked();
173 }
174 if (cl.getTimePacked() < tMinP) {
175 tMinP = cl.getTimePacked();
176 }
177 };
178 if (i < lasti) {
179 offset = lasti = 0; // dynamic OMP scheduling, need to reinitialize offset
180 }
181 while (lasti < i) {
182 offset += clusters.nTrackClusters[lasti++];
183 }
184 lasti++;
185 o2::gpu::TPCClusterDecompressionCore::decompressTrack(clusters, *mParam, maxTime, i, offset, checker);
186 const float tMin = o2::tpc::ClusterNative::unpackTime(tMinP), tMax = o2::tpc::ClusterNative::unpackTime(tMaxP);
187 const auto chkVal = firstIR + (tMin * constants::LHCBCPERTIMEBIN);
188 const auto chkExt = totalT > tMax - tMin ? ((totalT - (tMax - tMin)) * constants::LHCBCPERTIMEBIN + 1) : 0;
189 const bool reject = mCTFCoder.getIRFramesSelector().check(o2::dataformats::IRFrame(chkVal, chkVal + 1), chkExt, 0) < 0;
190 if (reject) {
191 for (unsigned int k = offset - clusters.nTrackClusters[i]; k < offset; k++) {
192 rejectTrackHits[k] = true;
193 }
194 for (unsigned int k = offset - clusters.nTrackClusters[i] - i; k < offset - i - 1; k++) {
195 rejectTrackHitsReduced[k] = true;
196 }
197 rejectTracks[i] = true;
198 static std::atomic_flag lock = ATOMIC_FLAG_INIT;
199 while (lock.test_and_set(std::memory_order_acquire)) {
200 }
201 clustersFiltered.nTracks--;
202 clustersFiltered.nAttachedClusters -= clusters.nTrackClusters[i];
203 clustersFiltered.nAttachedClustersReduced -= clusters.nTrackClusters[i] - 1;
204 lock.clear(std::memory_order_release);
205 }
206 }
207 offset = 0;
209 for (unsigned int i = 0; i < GPUCA_NSECTORS; i++) {
210 for (unsigned int j = 0; j < GPUCA_ROW_COUNT; j++) {
211 if (i * GPUCA_ROW_COUNT + j >= clusters.nSliceRows) {
212 break;
213 }
214 offsets[i][j] = offset;
215 offset += (i * GPUCA_ROW_COUNT + j >= clusters.nSliceRows) ? 0 : clusters.nSliceRowClusters[i * GPUCA_ROW_COUNT + j];
216 }
217 }
218
219#ifdef WITH_OPENMP
220#pragma omp parallel for num_threads(mNThreads) schedule(static, (GPUCA_NSECTORS + mNThreads - 1) / mNThreads) // Static round-robin scheduling with one chunk per thread to ensure correct order of the final vector
221#endif
222 for (unsigned int ii = 0; ii < clusters.nSliceRows; ii++) {
223 unsigned int i = ii / GPUCA_ROW_COUNT;
224 unsigned int j = ii % GPUCA_ROW_COUNT;
226#ifdef WITH_OPENMP
227 int myThread = omp_get_thread_num();
228#else
229 int myThread = 0;
230#endif
231 unsigned int count = 0;
232 const float x = GPUTPCGeometry::Row2X(j);
233 auto checker = [i, j, firstIR, totalT, x, this, &preCl, &count, &outBuffer = tmpBuffer[myThread], &rejectHits, &clustersFiltered](const o2::tpc::ClusterNative& cl, unsigned int k) {
234 const float y = GPUTPCGeometry::LinearPad2Y(i, j, cl.getPad());
235 const float r = sqrtf(x * x + y * y);
236 const float maxz = r * mEtaFactor + mMaxZ;
237 const unsigned int deltaBC = std::max<float>(0.f, totalT - mFastTransform->convDeltaZtoDeltaTimeInTimeFrameAbs(maxz)) * constants::LHCBCPERTIMEBIN;
238 const auto chkVal = firstIR + (cl.getTime() * constants::LHCBCPERTIMEBIN) - deltaBC;
239 const auto chkExt = totalT * constants::LHCBCPERTIMEBIN - deltaBC;
240 const bool reject = mCTFCoder.getIRFramesSelector().check(o2::dataformats::IRFrame(chkVal, chkVal + 1), chkExt, 0) < 0;
241 if (reject) {
242 rejectHits[k] = true;
243 clustersFiltered.nSliceRowClusters[i * GPUCA_ROW_COUNT + j]--;
244 static std::atomic_flag lock = ATOMIC_FLAG_INIT;
245 while (lock.test_and_set(std::memory_order_acquire)) {
246 }
247 clustersFiltered.nUnattachedClusters--;
248 lock.clear(std::memory_order_release);
249 } else {
250 outBuffer.first.emplace_back(0);
251 outBuffer.second.emplace_back(0);
252 GPUTPCCompression_EncodeUnattached(clustersFiltered.nComppressionModes, cl, outBuffer.first.back(), outBuffer.second.back(), count++ ? &preCl : nullptr);
253 preCl = cl;
254 }
255 };
256 unsigned int end = offsets[i][j] + clusters.nSliceRowClusters[i * GPUCA_ROW_COUNT + j];
257 o2::gpu::TPCClusterDecompressionCore::decompressHits(clusters, offsets[i][j], end, checker);
258 }
259 tmpBuffer[0].first.reserve(clustersFiltered.nUnattachedClusters);
260 tmpBuffer[0].second.reserve(clustersFiltered.nUnattachedClusters);
261 for (int i = 1; i < mNThreads; i++) {
262 tmpBuffer[0].first.insert(tmpBuffer[0].first.end(), tmpBuffer[i].first.begin(), tmpBuffer[i].first.end());
263 tmpBuffer[i].first.clear();
264 tmpBuffer[0].second.insert(tmpBuffer[0].second.end(), tmpBuffer[i].second.begin(), tmpBuffer[i].second.end());
265 tmpBuffer[i].second.clear();
266 }
267 clustersFiltered.timeDiffU = tmpBuffer[0].first.data();
268 clustersFiltered.padDiffU = tmpBuffer[0].second.data();
269 }
270
271 // transform trigger info to differential form
272 uint32_t prevOrbit = -1;
273 uint16_t prevBC = -1;
274 if (trigComp.triggerType.size()) {
275 prevOrbit = trigComp.firstOrbit = trigComp.deltaOrbit[0];
276 prevBC = trigComp.deltaBC[0];
277 trigComp.deltaOrbit[0] = 0;
278 for (size_t it = 1; it < trigComp.triggerType.size(); it++) {
279 if (trigComp.deltaOrbit[it] == prevOrbit) {
280 auto bc = trigComp.deltaBC[it];
281 trigComp.deltaBC[it] -= prevBC;
282 prevBC = bc;
283 trigComp.deltaOrbit[it] = 0;
284 } else {
285 auto orb = trigComp.deltaOrbit[it];
286 trigComp.deltaOrbit[it] -= prevOrbit;
287 prevOrbit = orb;
288 }
289 }
290 }
291
292 auto iosize = mCTFCoder.encode(buffer, clusters, clustersFiltered, trigComp, mSelIR ? &rejectHits : nullptr, mSelIR ? &rejectTracks : nullptr, mSelIR ? &rejectTrackHits : nullptr, mSelIR ? &rejectTrackHitsReduced : nullptr);
293 pc.outputs().snapshot({"ctfrep", 0}, iosize);
294 mTimer.Stop();
295 if (mSelIR) {
296 mCTFCoder.getIRFramesSelector().clear();
297 }
298 LOG(info) << iosize.asString() << " in " << mTimer.CpuTime() - cput << " s";
299}
300
302{
303 LOGF(info, "TPC Entropy Encoding total timing: Cpu: %.3e Real: %.3e s in %d slots",
304 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
305}
306
307DataProcessorSpec getEntropyEncoderSpec(bool inputFromFile, bool selIR, const std::string& ctfdictOpt)
308{
309 std::vector<InputSpec> inputs;
310 header::DataDescription inputType = inputFromFile ? header::DataDescription("COMPCLUSTERS") : header::DataDescription("COMPCLUSTERSFLAT");
311 inputs.emplace_back("input", "TPC", inputType, 0, Lifetime::Timeframe);
312 inputs.emplace_back("trigger", "TPC", "TRIGGERWORDS", 0, Lifetime::Timeframe);
313
314 if (ctfdictOpt.empty() || ctfdictOpt == "ccdb") {
315 inputs.emplace_back("ctfdict", "TPC", "CTFDICT", 0, Lifetime::Condition, ccdbParamSpec("TPC/Calib/CTFDictionaryTree"));
316 }
317
318 std::shared_ptr<o2::base::GRPGeomRequest> ggreq;
319 if (selIR) {
320 inputs.emplace_back("selIRFrames", "CTF", "SELIRFRAMES", 0, Lifetime::Timeframe);
321 ggreq = std::make_shared<o2::base::GRPGeomRequest>(false, true, false, true, false, o2::base::GRPGeomRequest::None, inputs, true);
323 }
324 return DataProcessorSpec{
325 "tpc-entropy-encoder", // process id
326 inputs,
327 Outputs{{"TPC", "CTFDATA", 0, Lifetime::Timeframe},
328 {{"ctfrep"}, "TPC", "CTFENCREP", 0, Lifetime::Timeframe}},
329 AlgorithmSpec{adaptFromTask<EntropyEncoderSpec>(inputFromFile, selIR, ggreq, ctfdictOpt)},
330 Options{{"no-ctf-columns-combining", VariantType::Bool, false, {"Do not combine correlated columns in CTF"}},
331 {"irframe-margin-bwd", VariantType::UInt32, 0u, {"margin in BC to add to the IRFrame lower boundary when selection is requested"}},
332 {"irframe-margin-fwd", VariantType::UInt32, 0u, {"margin in BC to add to the IRFrame upper boundary when selection is requested"}},
333 {"irframe-clusters-maxeta", VariantType::Float, 1.5f, {"Max eta for non-assigned clusters"}},
334 {"irframe-clusters-maxz", VariantType::Float, 25.f, {"Max z for non assigned clusters (combined with maxeta)"}},
335 {"mem-factor", VariantType::Float, 1.f, {"Memory allocation margin factor"}},
336 {"nThreads-tpc-encoder", VariantType::UInt32, 1u, {"number of threads to use for decoding"}},
337 {"ans-version", VariantType::String, {"version of ans entropy coder implementation to use"}}}};
338}
339} // namespace tpc
340} // namespace o2
Class of a TPC cluster in TPC-native coordinates (row, time)
Container to store compressed TPC cluster data.
uint64_t bc
Definition RawEventData.h:5
int32_t i
#define GPUCA_NSECTORS
#define GPUCA_ROW_COUNT
Helper for geometry and GRP related CCDB requests.
uint32_t j
Definition RawData.h:0
class to create TPC fast transformation
ProcessorSpec for the TPC cluster entropy encoding.
Helper class to extract VDrift from different sources.
Definitions of TPC Zero Suppression Data Headers.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void updateTimeDependentParams(o2::framework::ProcessingContext &pc, bool askTree=false)
void init(o2::framework::InitContext &ic)
void setSelectedIRFrames(const SPAN &sp)
o2::utils::IRFrameSelector & getIRFramesSelector()
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static constexpr ID TPC
Definition DetID.h:64
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ConfigParamRegistry const & options()
Definition InitContext.h:33
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static uint32_t getTpcMaxTimeBinFromNHbf(uint32_t nHbf)
static float getNominalGPUBz(T &src)
static std::unique_ptr< GPUParam > getFullParam(float solenoidBz, uint32_t nHbfPerTf=0, std::unique_ptr< GPUO2InterfaceConfiguration > *pConfiguration=nullptr, std::unique_ptr< GPUSettingsO2 > *pO2Settings=nullptr, bool *autoMaxTimeBin=nullptr)
o2::ctf::CTFIOSize encode(VEC &buff, const CompressedClusters &ccl, const CompressedClusters &cclFiltered, const detail::TriggerInfo &trigComp, std::vector< bool > *rejectHits=nullptr, std::vector< bool > *rejectTracks=nullptr, std::vector< bool > *rejectTrackHits=nullptr, std::vector< bool > *rejectTrackHitsReduced=nullptr)
entropy-encode compressed clusters to flat buffer
Definition CTFCoder.h:184
void setCombineColumns(bool v)
Definition CTFCoder.h:162
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void run(o2::framework::ProcessingContext &pc) final
EntropyEncoderSpec(bool fromFile, bool selIR=false, std::shared_ptr< o2::base::GRPGeomRequest > pgg=std::shared_ptr< o2::base::GRPGeomRequest >(), const std::string &ctfdictOpt="none")
int updateCalibration(TPCFastTransform &transform, Long_t TimeStamp, float vDriftFactor=1.f, float vDriftRef=0.f, float driftTimeOffset=0.f)
Updates the transformation with the new time stamp.
static TPCFastTransformHelperO2 * instance()
Singleton.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
long check(o2::dataformats::IRFrame fr, size_t bwd=0, size_t fwd=0)
GLint GLenum GLint x
Definition glcorearb.h:403
GLint GLsizei count
Definition glcorearb.h:399
GLuint buffer
Definition glcorearb.h:655
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint GLuint end
Definition glcorearb.h:469
GLintptr offset
Definition glcorearb.h:660
GLboolean r
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
std::vector< OutputSpec > Outputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
constexpr int LHCBCPERTIMEBIN
Definition Constants.h:38
framework::DataProcessorSpec getEntropyEncoderSpec(bool inputFromFile, bool selIR=false, const std::string &ctfdictOpt="none")
create a processor spec
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
wrapper for the Entropy-encoded clusters of the TF
Definition CTF.h:36
static constexpr int scaleTimePacked
static constexpr uint16_t MaxTriggerEntries
Maximum number of trigger information.
std::vector< int32_t > deltaOrbit
Definition CTFCoder.h:45
std::vector< int16_t > deltaBC
Definition CTFCoder.h:46
std::vector< uint8_t > triggerType
Definition CTFCoder.h:47
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Cluster > clusters