26#include <boost/throw_exception.hpp>
37#define ASSERT_ERROR(condition) \
38 if ((condition) == false) { \
39 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
42DataProcessorSpec generateIDCsCRU(
int lane,
const unsigned int maxTFs,
const std::vector<uint32_t>& crus,
const bool delay,
const bool loadFromFile,
const int dropTFsRandom,
const std::vector<int>& rangeTFsDrop);
48void customize(std::vector<ConfigParamSpec>& workflowOptions)
52 std::vector<ConfigParamSpec> options{
53 {
"crus", VariantType::String, cruDefault.c_str(), {
"List of CRUs, comma separated ranges, e.g. 0-3,7,9-15"}},
54 {
"timeframes", VariantType::Int, 20, {
"Number of TFs which will be produced"}},
55 {
"ion-drift-time", VariantType::Int, 50, {
"Ion drift time in ms. (Number of 1D-IDCs which will be used for the calculation of the fourier coefficients.)"}},
56 {
"nFourierCoeff", VariantType::Int, 20, {
"Number of fourier coefficients per TF (real+imag) which will be compared. The maximum can be 'ion-drift-time + 2'."}},
57 {
"use-naive-fft", VariantType::Bool,
false, {
"using naive fourier transform (true) or FFTW (false)"}},
58 {
"seed", VariantType::Int, 0, {
"Seed for the random IDC generator."}},
59 {
"only-idc-gen", VariantType::Bool,
false, {
"Start only the IDC generator device"}},
60 {
"load-from-file", VariantType::Bool,
false, {
"load from file"}},
61 {
"debug", VariantType::Bool,
false, {
"create debug for FT"}},
62 {
"idc-gen-lanes", VariantType::Int, 1, {
"number of parallel lanes for generation of IDCs"}},
63 {
"idc-gen-time-lanes", VariantType::Int, 1, {
"number of parallel lanes for generation of IDCs"}},
64 {
"delay", VariantType::Bool,
false, {
"Add delay for sending IDCs"}},
65 {
"dropTFsRandom", VariantType::Int, 0, {
"Drop randomly whole TFs every dropTFsRandom TFs (for all CRUs)"}},
66 {
"dropTFsRange", VariantType::String,
"", {
"Drop range of TFs"}},
67 {
"hbfutils-config", VariantType::String,
"hbfutils", {
"config file for HBFUtils (or none) to get number of orbits per TF"}},
68 {
"nthreads", VariantType::Int, 1, {
"Number of threads."}},
69 {
"iter", VariantType::Int, 0, {
"Iteration for testing the workflow (.....)"}},
70 {
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings"}}};
72 std::swap(workflowOptions, options);
75void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
80void customize(std::vector<o2::framework::CompletionPolicy>& policies)
90 const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.
options().
get<std::string>(
"crus"));
91 const auto rangeTFsDrop = o2::RangeTokenizer::tokenize<int>(config.
options().
get<std::string>(
"dropTFsRange"));
93 const auto nCRUs = tpcCRUs.size();
94 const auto first = tpcCRUs.begin();
95 const auto last = std::min(tpcCRUs.end(),
first + nCRUs);
96 const std::vector<uint32_t> crus(
first, last);
97 const auto timeframes =
static_cast<unsigned int>(config.
options().
get<
int>(
"timeframes"));
98 const auto iondrifttime =
static_cast<unsigned int>(config.
options().
get<
int>(
"ion-drift-time"));
99 const auto nFourierCoefficients = std::clamp(
static_cast<unsigned int>(config.
options().
get<
int>(
"nFourierCoeff")),
static_cast<unsigned int>(0), iondrifttime + 2);
100 const auto nthreads =
static_cast<unsigned int>(config.
options().
get<
int>(
"nthreads"));
101 const auto iter =
static_cast<unsigned int>(config.
options().
get<
int>(
"iter"));
102 const auto seed =
static_cast<unsigned int>(config.
options().
get<
int>(
"seed"));
103 const auto idcgenlanes =
static_cast<unsigned int>(config.
options().
get<
int>(
"idc-gen-lanes"));
104 const auto idcgentimelanes =
static_cast<unsigned int>(config.
options().
get<
int>(
"idc-gen-time-lanes"));
105 const auto delay =
static_cast<unsigned int>(config.
options().
get<
bool>(
"delay"));
107 const bool fft = config.
options().
get<
bool>(
"use-naive-fft");
108 const bool onlyIDCGen = config.
options().
get<
bool>(
"only-idc-gen");
109 const bool debugFT = config.
options().
get<
bool>(
"debug");
110 const int dropTFsRandom = config.
options().
get<
int>(
"dropTFsRandom");
112 const unsigned int firstTF = 0;
113 const unsigned int nLanes = 1;
114 const bool loadFromFileGen = config.
options().
get<
bool>(
"load-from-file");
115 gRandom->SetSeed(seed);
118 for (
int ilane = 0; ilane < idcgenlanes; ++ilane) {
119 const auto crusPerLane = nCRUs / idcgenlanes + ((nCRUs % idcgenlanes) != 0);
120 const auto first = tpcCRUs.begin() + ilane * crusPerLane;
121 if (
first >= tpcCRUs.end()) {
124 const auto last = std::min(tpcCRUs.end(),
first + crusPerLane);
125 const std::vector<uint32_t> rangeCRUs(
first, last);
126 workflow.emplace_back(
timePipeline(
generateIDCsCRU(ilane, timeframes, rangeCRUs, delay, loadFromFileGen, dropTFsRandom, rangeTFsDrop), idcgentimelanes));
130 long startTime = hbfu.startTime > 0 ? hbfu.startTime : std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
134 if (onlyIDCGen ==
true) {
139 auto workflowTmp =
WorkflowSpec{
getTPCFLPIDCSpec(0, crus, iondrifttime,
false,
"",
true,
false),
getTPCDistributeIDCSpec(0, crus, timeframes, nLanes, firstTF,
false),
getTPCFactorizeIDCSpec(0, crus, timeframes, timeframes,
o2::tpc::IDCDeltaCompression::NO,
false,
true,
false)};
140 for (
auto& spec : workflowTmp) {
141 workflow.emplace_back(spec);
143 }
else if (iter == 1) {
145 const std::string idc0File = (
side == Side::A) ? fmt::format(
"IDCZero_A_{:02}.root", timeframes - 1) : fmt::format(
"IDCZero_C_{:02}.root", timeframes - 1);
146 auto workflowTmp =
WorkflowSpec{
getTPCFLPIDCSpec(0, crus, iondrifttime,
false, idc0File.data(),
true,
true),
getTPCFourierTransformEPNSpec(crus, iondrifttime, nFourierCoefficients),
getTPCDistributeIDCSpec(0, crus, timeframes, nLanes, firstTF,
false),
getTPCFactorizeIDCSpec(0, crus, timeframes, timeframes,
o2::tpc::IDCDeltaCompression::NO,
false,
true,
false),
getTPCFourierTransformAggregatorSpec(iondrifttime, nFourierCoefficients,
true,
false, nLanes),
receiveFourierCoeffEPN(timeframes, nFourierCoefficients),
compare_EPN_AGG()};
147 for (
auto& spec : workflowTmp) {
148 workflow.emplace_back(spec);
159DataProcessorSpec generateIDCsCRU(
int lane,
const unsigned int maxTFs,
const std::vector<uint32_t>& crus,
const bool delay,
const bool loadFromFile,
const int dropTFsRandom,
const std::vector<int>& rangeTFsDrop)
161 using timer = std::chrono::high_resolution_clock;
163 std::array<std::vector<float>,
CRU::MaxCRU> mIDCs{};
165 TFile fInp(
"IDCGroup.root",
"READ");
166 for (
TObject* keyAsObj : *fInp.GetListOfKeys()) {
167 const auto key =
dynamic_cast<TKey*
>(keyAsObj);
168 LOGP(info,
"Key name: {} Type: {}",
key->GetName(),
key->GetClassName());
169 std::vector<float>* idcData = (std::vector<float>*)fInp.Get(
key->GetName());
170 std::string
name =
key->GetName();
171 const auto pos =
name.find_last_of(
'_') + 1;
172 const int cru = std::stoi(
name.substr(
pos,
name.length()));
173 mIDCs[cru] = *idcData;
174 LOGP(info,
"Loaded {} IDCs from file for CRU {}", mIDCs[cru].
size(), cru);
180 const int nOrbitsPerTF = 128;
181 const int nOrbitsPerIDC = 12;
182 const int nIDCs = nOrbitsPerTF / nOrbitsPerIDC;
184 std::vector<OutputSpec> outputSpecs;
185 outputSpecs.reserve(crus.size());
186 for (
const auto& cru : crus) {
191 gRandom->SetSeed(42);
193 fmt::format(
"idc-generator-{:02}", lane).data(),
197 [maxTFs, nIDCs, delay, cruStart = crus.front(), cruEnde = crus.back(), mIDCs, dropTFsRandom, rangeTFsDrop](
ProcessingContext& ctx) {
200 if (!rangeTFsDrop.empty()) {
201 if (
tf >= rangeTFsDrop.front() &&
tf <= rangeTFsDrop.back()) {
202 LOGP(info,
"Dropping TF as specified from range: {}",
tf);
207 if (dropTFsRandom && !gRandom->Integer(dropTFsRandom)) {
208 LOGP(info,
"Dropping TF: {}",
tf);
212 auto start = timer::now();
213 const unsigned int additionalInterval = (
tf % 3) ? 1 : 0;
214 const unsigned int intervals = (nIDCs + additionalInterval);
215 const bool generateIDCs = mIDCs.front().empty();
216 const float irVar = 1 + 0.1 * std::sin(
tf * 0.0035);
218 std::vector<int> intervalsRand;
219 std::vector<float> normFac;
220 intervalsRand.reserve(intervals);
221 normFac.reserve(intervals);
223 const float globalScaling = gRandom->Gaus(1, 0.2);
224 for (
int i = 0;
i < intervals; ++
i) {
225 intervalsRand.emplace_back(gRandom->Integer(nIntervalsMax));
226 normFac.emplace_back(globalScaling + gRandom->Gaus(1, 0.02));
229 for (uint32_t icru = cruStart; icru <= cruEnde; ++icru) {
235 const int nIntervals = intervalsRand.size();
236 for (
int interval = 0; interval < nIntervals; ++interval) {
237 const int offset = intervalsRand[interval] * nPads;
238 for (
int iPad = 0; iPad < nPads; ++iPad) {
240 idcs.emplace_back(irVar * gRandom->Gaus(10, 20) / normFac[interval]);
242 idcs.emplace_back(irVar * mIDCs[cru][
offset + iPad] / normFac[interval]);
250 auto stop = timer::now();
251 auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(stop -
start);
252 float totalTime = milliseconds.count();
253 const int sleepTime = (totalTime < intervals) ? (intervals - totalTime) : 0;
255 LOGP(info,
"time: {} for {} intervals (ms): sleep for {} for TF: {}", totalTime, intervals,
sleepTime,
tf);
257 std::this_thread::sleep_for(std::chrono::milliseconds(
sleepTime));
260 if (
tf >= (maxTFs - 1)) {
261 ctx.services().get<
ControlService>().readyToQuit(QuitRequest::Me);
270 TPCReceiveEPNSpec(
const unsigned int nTFs,
const unsigned int nFourierCoefficients) : mFourierCoeffEPN(nTFs,
o2::tpc::
FourierCoeff(1, nFourierCoefficients)), mMaxTF{nTFs - 1} {};
277 auto const* tpcFourierCoeffHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
278 const int side = tpcFourierCoeffHeader->subSpecification;
279 mFourierCoeffEPN[
tf].mFourierCoefficients = ctx.inputs().get<std::vector<float>>(
ref);
283 ctx.outputs().snapshot(
Output{gDataOriginTPC, getDataDescriptionCoeffEPN()}, mFourierCoeffEPN);
288 std::vector<o2::tpc::FourierCoeff> mFourierCoeffEPN;
289 const unsigned int mMaxTF{};
299 LOGP(info,
"==== Comparing Fourier coefficients for EPN and Aggregator... ====");
300 const auto fourierCoeffAgg = ctx.inputs().get<
FourierCoeff*>(ctx.inputs().get(
"coeffAgg"));
301 const std::vector<o2::tpc::FourierCoeff> fourierCoeffEPN = ctx.inputs().get<std::vector<o2::tpc::FourierCoeff>>(ctx.inputs().get(
"coeffEPN"));
302 for (
int tf = 0;
tf < fourierCoeffEPN.size(); ++
tf) {
303 for (
int i = 0;
i < fourierCoeffEPN[
tf].getNCoefficientsPerTF(); ++
i) {
304 const float epnval = fourierCoeffEPN[
tf](
i);
305 const float aggval = (*fourierCoeffAgg)(fourierCoeffAgg->getIndex(
tf,
i));
306 ASSERT_ERROR((std::abs(std::min(epnval, aggval)) < 1.f) ? isSameZero(epnval, aggval) : isSame(epnval, aggval));
309 LOGP(info,
"==== Test finished successfull! Fourier coefficients for EPN and Aggregator are equal. ====");
311 ctx.services().get<
ControlService>().readyToQuit(QuitRequest::All);
315 const float mTolerance = 1e-4;
317 bool isSameZero(
const float epnval,
const float aggval)
const {
return (epnval - aggval) * (epnval - aggval) < mTolerance; }
318 bool isSame(
const float epnval,
const float aggval)
const {
return std::abs((epnval - aggval) / std::min(epnval, aggval)) < mTolerance; }
326 "idc-fouriercoeff-epn",
329 AlgorithmSpec{adaptFromTask<TPCReceiveEPNSpec>(maxTFs, nFourierCoefficients)},
335 std::vector<InputSpec> inputSpecs;
339 "idc-fouriercoeff-compare",
Helper function to tokenize sequences and ranges of integral numbers.
TPC aggregation of grouped IDCs and factorization.
TPC device for processing on FLPs.
TPC integration of IDCs processor.
void run(o2::framework::ProcessingContext &ctx) final
device for receiving the fourier coefficients from the EPN and the aggregator and comparing them
TPCReceiveEPNSpec(const unsigned int nTFs, const unsigned int nFourierCoefficients)
device for receiving and aggregating fourier coefficients from EPN device
void run(o2::framework::ProcessingContext &ctx) final
static const HBFUtils & Instance()
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
T get(const char *key) const
virtual void endOfStream(EndOfStreamContext &context)
This is invoked whenever we have an EndOfStream event.
unsigned char region() const
static void setNThreads(const int nThreads)
set the number of threads used for some of the calculations
static constexpr unsigned int NREGIONS
total number of regions in one sector
static constexpr unsigned int PADSPERREGION[NREGIONS]
number of pads per CRU
static constexpr header::DataDescription getDataDescription(const IDCFormat idcFormat)
GLuint const GLchar * name
constexpr o2::header::DataOrigin gDataOriginTPC
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
DataProcessorSpec getTPCFourierTransformAggregatorSpec(const unsigned int rangeIDC, const unsigned int nFourierCoefficientsStore, const bool senddebug, const bool processSACs, const int inputLanes)
DataProcessorSpec getTPCFLPIDCSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int rangeIDC, const bool loadStatusMap, const std::string idc0File, const bool disableIDC0CCDB, const bool enableSynchProc, const int nTFsBuffer=1)
DataProcessorSpec getTPCFourierTransformEPNSpec(const std::vector< uint32_t > &crus, const unsigned int rangeIDC, const unsigned int nFourierCoefficientsSend)
@ NO
no compression using floats
DataProcessorSpec getTPCFactorizeIDCSpec(const int lane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int timeframesDeltaIDC, const IDCDeltaCompression compression, const bool usePrecisetimeStamp, const bool sendOutputFFT, const bool sendCCDB, const int nTFsBuffer=1)
DataProcessorSpec getTPCDistributeIDCSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp=false, const int nTFsBuffer=1)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
std::unique_ptr< GPUReconstructionTimeframe > tf
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
struct containing the fourier coefficients calculated from IDC0 for n timeframes
DataProcessorSpec generateIDCsCRU(int lane, const unsigned int maxTFs, const std::vector< uint32_t > &crus, const bool delay, const bool loadFromFile, const int dropTFsRandom, const std::vector< int > &rangeTFsDrop)
#define ASSERT_ERROR(condition)
DataProcessorSpec receiveFourierCoeffEPN(const unsigned int maxTFs, const unsigned int nFourierCoefficients)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
DataProcessorSpec compare_EPN_AGG()