Project
Loading...
Searching...
No Matches
test_ft_EPN_Aggregator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
25
26#include <boost/throw_exception.hpp>
30#include "TRandom.h"
31#include "TKey.h"
32
33#include <unistd.h>
34
35using namespace o2::framework;
36
37#define ASSERT_ERROR(condition) \
38 if ((condition) == false) { \
39 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
40 }
41
42DataProcessorSpec generateIDCsCRU(int lane, const unsigned int maxTFs, const std::vector<uint32_t>& crus, const bool delay, const bool loadFromFile, const int dropTFsRandom, const std::vector<int>& rangeTFsDrop);
43DataProcessorSpec receiveFourierCoeffEPN(const unsigned int maxTFs, const unsigned int nFourierCoefficients);
45static constexpr o2::header::DataDescription getDataDescriptionCoeffEPN() { return o2::header::DataDescription{"COEFFEPNALL"}; }
46
47// we need to add workflow options before including Framework/runDataProcessing
48void customize(std::vector<ConfigParamSpec>& workflowOptions)
49{
50 const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1);
51
52 std::vector<ConfigParamSpec> options{
53 {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}},
54 {"timeframes", VariantType::Int, 20, {"Number of TFs which will be produced"}},
55 {"ion-drift-time", VariantType::Int, 50, {"Ion drift time in ms. (Number of 1D-IDCs which will be used for the calculation of the fourier coefficients.)"}},
56 {"nFourierCoeff", VariantType::Int, 20, {"Number of fourier coefficients per TF (real+imag) which will be compared. The maximum can be 'ion-drift-time + 2'."}},
57 {"use-naive-fft", VariantType::Bool, false, {"using naive fourier transform (true) or FFTW (false)"}},
58 {"seed", VariantType::Int, 0, {"Seed for the random IDC generator."}},
59 {"only-idc-gen", VariantType::Bool, false, {"Start only the IDC generator device"}},
60 {"load-from-file", VariantType::Bool, false, {"load from file"}},
61 {"debug", VariantType::Bool, false, {"create debug for FT"}},
62 {"idc-gen-lanes", VariantType::Int, 1, {"number of parallel lanes for generation of IDCs"}},
63 {"idc-gen-time-lanes", VariantType::Int, 1, {"number of parallel lanes for generation of IDCs"}},
64 {"delay", VariantType::Bool, false, {"Add delay for sending IDCs"}},
65 {"dropTFsRandom", VariantType::Int, 0, {"Drop randomly whole TFs every dropTFsRandom TFs (for all CRUs)"}},
66 {"dropTFsRange", VariantType::String, "", {"Drop range of TFs"}},
67 {"hbfutils-config", VariantType::String, "hbfutils", {"config file for HBFUtils (or none) to get number of orbits per TF"}},
68 {"nthreads", VariantType::Int, 1, {"Number of threads."}},
69 {"iter", VariantType::Int, 0, {"Iteration for testing the workflow (.....)"}},
70 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}};
72 std::swap(workflowOptions, options);
73}
74
75void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
76{
78}
79
80void customize(std::vector<o2::framework::CompletionPolicy>& policies)
81{
83 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-factorize-idc.*", CompletionPolicy::CompletionOp::Consume));
84}
85
87
89{
90 const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus"));
91 const auto rangeTFsDrop = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("dropTFsRange"));
92 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
93 const auto nCRUs = tpcCRUs.size();
94 const auto first = tpcCRUs.begin();
95 const auto last = std::min(tpcCRUs.end(), first + nCRUs);
96 const std::vector<uint32_t> crus(first, last);
97 const auto timeframes = static_cast<unsigned int>(config.options().get<int>("timeframes"));
98 const auto iondrifttime = static_cast<unsigned int>(config.options().get<int>("ion-drift-time"));
99 const auto nFourierCoefficients = std::clamp(static_cast<unsigned int>(config.options().get<int>("nFourierCoeff")), static_cast<unsigned int>(0), iondrifttime + 2);
100 const auto nthreads = static_cast<unsigned int>(config.options().get<int>("nthreads"));
101 const auto iter = static_cast<unsigned int>(config.options().get<int>("iter"));
102 const auto seed = static_cast<unsigned int>(config.options().get<int>("seed"));
103 const auto idcgenlanes = static_cast<unsigned int>(config.options().get<int>("idc-gen-lanes"));
104 const auto idcgentimelanes = static_cast<unsigned int>(config.options().get<int>("idc-gen-time-lanes"));
105 const auto delay = static_cast<unsigned int>(config.options().get<bool>("delay"));
106
107 const bool fft = config.options().get<bool>("use-naive-fft");
108 const bool onlyIDCGen = config.options().get<bool>("only-idc-gen");
109 const bool debugFT = config.options().get<bool>("debug");
110 const int dropTFsRandom = config.options().get<int>("dropTFsRandom");
111
112 const unsigned int firstTF = 0;
113 const unsigned int nLanes = 1;
114 const bool loadFromFileGen = config.options().get<bool>("load-from-file");
115 gRandom->SetSeed(seed);
116
117 WorkflowSpec workflow;
118 for (int ilane = 0; ilane < idcgenlanes; ++ilane) {
119 const auto crusPerLane = nCRUs / idcgenlanes + ((nCRUs % idcgenlanes) != 0);
120 const auto first = tpcCRUs.begin() + ilane * crusPerLane;
121 if (first >= tpcCRUs.end()) {
122 break;
123 }
124 const auto last = std::min(tpcCRUs.end(), first + crusPerLane);
125 const std::vector<uint32_t> rangeCRUs(first, last);
126 workflow.emplace_back(timePipeline(generateIDCsCRU(ilane, timeframes, rangeCRUs, delay, loadFromFileGen, dropTFsRandom, rangeTFsDrop), idcgentimelanes));
127 }
128
129 auto& hbfu = o2::raw::HBFUtils::Instance();
130 long startTime = hbfu.startTime > 0 ? hbfu.startTime : std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
131 o2::conf::ConfigurableParam::updateFromString(fmt::format("HBFUtils.startTime={}", startTime).data());
132 o2::raw::HBFUtilsInitializer hbfIni(config, workflow);
133
134 if (onlyIDCGen == true) {
135 return workflow;
136 }
137
138 if (iter == 0) {
139 auto workflowTmp = WorkflowSpec{getTPCFLPIDCSpec(0, crus, iondrifttime, false, "", true, false), getTPCDistributeIDCSpec(0, crus, timeframes, nLanes, firstTF, false), getTPCFactorizeIDCSpec(0, crus, timeframes, timeframes, o2::tpc::IDCDeltaCompression::NO, false, true, false)};
140 for (auto& spec : workflowTmp) {
141 workflow.emplace_back(spec);
142 }
143 } else if (iter == 1) {
144 const Side side = CRU(crus.front()).side();
145 const std::string idc0File = (side == Side::A) ? fmt::format("IDCZero_A_{:02}.root", timeframes - 1) : fmt::format("IDCZero_C_{:02}.root", timeframes - 1);
146 auto workflowTmp = WorkflowSpec{getTPCFLPIDCSpec(0, crus, iondrifttime, false, idc0File.data(), true, true), getTPCFourierTransformEPNSpec(crus, iondrifttime, nFourierCoefficients), getTPCDistributeIDCSpec(0, crus, timeframes, nLanes, firstTF, false), getTPCFactorizeIDCSpec(0, crus, timeframes, timeframes, o2::tpc::IDCDeltaCompression::NO, false, true, false), getTPCFourierTransformAggregatorSpec(iondrifttime, nFourierCoefficients, true, false, nLanes), receiveFourierCoeffEPN(timeframes, nFourierCoefficients), compare_EPN_AGG()};
147 for (auto& spec : workflowTmp) {
148 workflow.emplace_back(spec);
149 }
150 }
151
156 return workflow;
157}
158
159DataProcessorSpec generateIDCsCRU(int lane, const unsigned int maxTFs, const std::vector<uint32_t>& crus, const bool delay, const bool loadFromFile, const int dropTFsRandom, const std::vector<int>& rangeTFsDrop)
160{
161 using timer = std::chrono::high_resolution_clock;
162
163 std::array<std::vector<float>, CRU::MaxCRU> mIDCs{};
164 if (loadFromFile) {
165 TFile fInp("IDCGroup.root", "READ");
166 for (TObject* keyAsObj : *fInp.GetListOfKeys()) {
167 const auto key = dynamic_cast<TKey*>(keyAsObj);
168 LOGP(info, "Key name: {} Type: {}", key->GetName(), key->GetClassName());
169 std::vector<float>* idcData = (std::vector<float>*)fInp.Get(key->GetName());
170 std::string name = key->GetName();
171 const auto pos = name.find_last_of('_') + 1;
172 const int cru = std::stoi(name.substr(pos, name.length()));
173 mIDCs[cru] = *idcData;
174 LOGP(info, "Loaded {} IDCs from file for CRU {}", mIDCs[cru].size(), cru);
175 delete idcData;
176 }
177 }
178
179 // generate random IDCs per CRU
180 const int nOrbitsPerTF = 128; // number of orbits per TF
181 const int nOrbitsPerIDC = 12; // integration interval in units of orbits per IDC
182 const int nIDCs = nOrbitsPerTF / nOrbitsPerIDC;
183
184 std::vector<OutputSpec> outputSpecs;
185 outputSpecs.reserve(crus.size());
186 for (const auto& cru : crus) {
187 const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
188 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCIntegrateIDCDevice::getDataDescription(TPCIntegrateIDCDevice::IDCFormat::Sim), subSpec});
189 }
190
191 gRandom->SetSeed(42);
192 return DataProcessorSpec{
193 fmt::format("idc-generator-{:02}", lane).data(),
194 Inputs{},
195 outputSpecs,
197 [maxTFs, nIDCs, delay, cruStart = crus.front(), cruEnde = crus.back(), mIDCs, dropTFsRandom, rangeTFsDrop](ProcessingContext& ctx) {
198 const auto tf = processing_helpers::getCurrentTF(ctx);
199
200 if (!rangeTFsDrop.empty()) {
201 if (tf >= rangeTFsDrop.front() && tf <= rangeTFsDrop.back()) {
202 LOGP(info, "Dropping TF as specified from range: {}", tf);
203 return;
204 }
205 }
206
207 if (dropTFsRandom && !gRandom->Integer(dropTFsRandom)) {
208 LOGP(info, "Dropping TF: {}", tf);
209 return;
210 }
211
212 auto start = timer::now();
213 const unsigned int additionalInterval = (tf % 3) ? 1 : 0; // in each integration inerval are either 10 or 11 values when having 128 orbits per TF and 12 orbits integration length
214 const unsigned int intervals = (nIDCs + additionalInterval);
215 const bool generateIDCs = mIDCs.front().empty();
216 const float irVar = 1 + 0.1 * std::sin(tf * 0.0035); // IR variations
217
218 std::vector<int> intervalsRand;
219 std::vector<float> normFac;
220 intervalsRand.reserve(intervals);
221 normFac.reserve(intervals);
222 const int nIntervalsMax = generateIDCs ? intervals : (mIDCs[0].size() / o2::tpc::Mapper::PADSPERREGION[0]);
223 const float globalScaling = gRandom->Gaus(1, 0.2);
224 for (int i = 0; i < intervals; ++i) {
225 intervalsRand.emplace_back(gRandom->Integer(nIntervalsMax));
226 normFac.emplace_back(globalScaling + gRandom->Gaus(1, 0.02));
227 }
228
229 for (uint32_t icru = cruStart; icru <= cruEnde; ++icru) {
230 o2::tpc::CRU cruTmp(icru);
231 const unsigned int nPads = o2::tpc::Mapper::PADSPERREGION[cruTmp.region()];
232 const int cru = (icru + tf * Mapper::NREGIONS) % o2::tpc::CRU::MaxCRU; // shuffle CRUs
234 idcs.reserve(generateIDCs ? o2::tpc::Mapper::PADSPERREGION[cruTmp.region()] : mIDCs[cru].size());
235 const int nIntervals = intervalsRand.size();
236 for (int interval = 0; interval < nIntervals; ++interval) {
237 const int offset = intervalsRand[interval] * nPads;
238 for (int iPad = 0; iPad < nPads; ++iPad) {
239 if (generateIDCs) {
240 idcs.emplace_back(irVar * gRandom->Gaus(10, 20) / normFac[interval]);
241 } else {
242 idcs.emplace_back(irVar * mIDCs[cru][offset + iPad] / normFac[interval]);
243 }
244 }
245 }
246 ctx.outputs().adoptContainer(Output{gDataOriginTPC, TPCIntegrateIDCDevice::getDataDescription(TPCIntegrateIDCDevice::IDCFormat::Sim), o2::header::DataHeader::SubSpecificationType{icru << 7}}, std::move(idcs));
247 }
248
249 if (delay) {
250 auto stop = timer::now();
251 auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
252 float totalTime = milliseconds.count();
253 const int sleepTime = (totalTime < intervals) ? (intervals - totalTime) : 0;
254 if (!(tf % 100)) {
255 LOGP(info, "time: {} for {} intervals (ms): sleep for {} for TF: {}", totalTime, intervals, sleepTime, tf);
256 }
257 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime));
258 }
259
260 if (tf >= (maxTFs - 1)) {
261 ctx.services().get<ControlService>().readyToQuit(QuitRequest::Me);
262 }
263 }}};
264}
265
267{
269 public:
270 TPCReceiveEPNSpec(const unsigned int nTFs, const unsigned int nFourierCoefficients) : mFourierCoeffEPN(nTFs, o2::tpc::FourierCoeff(1, nFourierCoefficients)), mMaxTF{nTFs - 1} {};
271
273 {
274 const auto tf = processing_helpers::getCurrentTF(ctx);
275
276 for (auto const& ref : InputRecordWalker(ctx.inputs(), mFilter)) {
277 auto const* tpcFourierCoeffHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
278 const int side = tpcFourierCoeffHeader->subSpecification;
279 mFourierCoeffEPN[tf].mFourierCoefficients = ctx.inputs().get<std::vector<float>>(ref);
280 }
281
282 if (tf == mMaxTF) {
283 ctx.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCoeffEPN()}, mFourierCoeffEPN);
284 }
285 }
286
287 private:
288 std::vector<o2::tpc::FourierCoeff> mFourierCoeffEPN;
289 const unsigned int mMaxTF{};
290 const std::vector<InputSpec> mFilter = {{"coeffEPN", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFourierTransformEPNSpec::getDataDescription()}, Lifetime::Sporadic}};
291};
292
294{
296 public:
298 {
299 LOGP(info, "==== Comparing Fourier coefficients for EPN and Aggregator... ====");
300 const auto fourierCoeffAgg = ctx.inputs().get<FourierCoeff*>(ctx.inputs().get("coeffAgg"));
301 const std::vector<o2::tpc::FourierCoeff> fourierCoeffEPN = ctx.inputs().get<std::vector<o2::tpc::FourierCoeff>>(ctx.inputs().get("coeffEPN"));
302 for (int tf = 0; tf < fourierCoeffEPN.size(); ++tf) {
303 for (int i = 0; i < fourierCoeffEPN[tf].getNCoefficientsPerTF(); ++i) {
304 const float epnval = fourierCoeffEPN[tf](i);
305 const float aggval = (*fourierCoeffAgg)(fourierCoeffAgg->getIndex(tf, i));
306 ASSERT_ERROR((std::abs(std::min(epnval, aggval)) < 1.f) ? isSameZero(epnval, aggval) : isSame(epnval, aggval));
307 }
308 }
309 LOGP(info, "==== Test finished successfull! Fourier coefficients for EPN and Aggregator are equal. ====");
310 ctx.services().get<ControlService>().endOfStream();
311 ctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
312 }
313
314 private:
315 const float mTolerance = 1e-4;
316
317 bool isSameZero(const float epnval, const float aggval) const { return (epnval - aggval) * (epnval - aggval) < mTolerance; }
318 bool isSame(const float epnval, const float aggval) const { return std::abs((epnval - aggval) / std::min(epnval, aggval)) < mTolerance; }
319};
320
321DataProcessorSpec receiveFourierCoeffEPN(const unsigned int maxTFs, const unsigned int nFourierCoefficients)
322{
323 std::vector<InputSpec> inputSpecs{{"coeffEPN", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFourierTransformEPNSpec::getDataDescription()}, Lifetime::Sporadic}};
324 std::vector<OutputSpec> outputSpecs{ConcreteDataTypeMatcher{gDataOriginTPC, getDataDescriptionCoeffEPN()}};
325 return DataProcessorSpec{
326 "idc-fouriercoeff-epn",
327 inputSpecs,
328 outputSpecs,
329 AlgorithmSpec{adaptFromTask<TPCReceiveEPNSpec>(maxTFs, nFourierCoefficients)},
330 };
331}
332
334{
335 std::vector<InputSpec> inputSpecs;
336 inputSpecs.emplace_back(InputSpec("coeffEPN", ConcreteDataTypeMatcher{gDataOriginTPC, getDataDescriptionCoeffEPN()}));
337 inputSpecs.emplace_back(InputSpec("coeffAgg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFourierTransformAggregatorSpec::getDataDescriptionFourier()}));
338 return DataProcessorSpec{
339 "idc-fouriercoeff-compare",
340 inputSpecs,
341 Outputs{},
342 AlgorithmSpec{adaptFromTask<TPCCompareFourierCoeffSpec>()},
343 };
344}
int32_t i
Helper function to tokenize sequences and ranges of integral numbers.
uint16_t pos
Definition RawData.h:3
uint32_t side
Definition RawData.h:0
TPC aggregation of grouped IDCs and factorization.
TPC device for processing on FLPs.
TPC aggregation of 1D-IDCs and fourier transform.
fourier transform of 1D-IDCs used during synchronous reconstruction
TPC integration of IDCs processor.
StringRef key
void run(o2::framework::ProcessingContext &ctx) final
device for receiving the fourier coefficients from the EPN and the aggregator and comparing them
TPCReceiveEPNSpec(const unsigned int nTFs, const unsigned int nFourierCoefficients)
device for receiving and aggregating fourier coefficients from EPN device
void run(o2::framework::ProcessingContext &ctx) final
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
A helper class to iteratate over all parts of all input routes.
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
Definition Task.h:43
unsigned char region() const
Definition CRU.h:64
@ MaxCRU
Definition CRU.h:31
Side side() const
Definition CRU.h:62
static void setNThreads(const int nThreads)
set the number of threads used for some of the calculations
static void setNThreads(const int nThreads)
static constexpr unsigned int NREGIONS
total number of regions in one sector
Definition Mapper.h:527
static constexpr unsigned int PADSPERREGION[NREGIONS]
number of pads per CRU
Definition Mapper.h:530
static constexpr header::DataDescription getDataDescriptionFourier()
static constexpr header::DataDescription getDataDescription()
static constexpr header::DataDescription getDataDescription(const IDCFormat idcFormat)
GLsizeiptr size
Definition glcorearb.h:659
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
DataProcessorSpec getTPCFourierTransformAggregatorSpec(const unsigned int rangeIDC, const unsigned int nFourierCoefficientsStore, const bool senddebug, const bool processSACs, const int inputLanes)
DataProcessorSpec getTPCFLPIDCSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int rangeIDC, const bool loadStatusMap, const std::string idc0File, const bool disableIDC0CCDB, const bool enableSynchProc, const int nTFsBuffer=1)
DataProcessorSpec getTPCFourierTransformEPNSpec(const std::vector< uint32_t > &crus, const unsigned int rangeIDC, const unsigned int nFourierCoefficientsSend)
Side
TPC readout sidE.
Definition Defs.h:35
@ NO
no compression using floats
DataProcessorSpec getTPCFactorizeIDCSpec(const int lane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int timeframesDeltaIDC, const IDCDeltaCompression compression, const bool usePrecisetimeStamp, const bool sendOutputFFT, const bool sendCCDB, const int nTFsBuffer=1)
DataProcessorSpec getTPCDistributeIDCSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp=false, const int nTFsBuffer=1)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::unique_ptr< GPUReconstructionTimeframe > tf
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
uint32_t SubSpecificationType
Definition DataHeader.h:620
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
struct containing the fourier coefficients calculated from IDC0 for n timeframes
const int sleepTime
Definition test_Fifo.cxx:28
DataProcessorSpec generateIDCsCRU(int lane, const unsigned int maxTFs, const std::vector< uint32_t > &crus, const bool delay, const bool loadFromFile, const int dropTFsRandom, const std::vector< int > &rangeTFsDrop)
#define ASSERT_ERROR(condition)
DataProcessorSpec receiveFourierCoeffEPN(const unsigned int maxTFs, const unsigned int nFourierCoefficients)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
DataProcessorSpec compare_EPN_AGG()