51static const size_t SOLAR_ID_MAX = 100 * 8;
65static std::string readFileContent(std::string&
filename)
70 while (std::getline(in, s)) {
77static bool isValidDeID(
int deId)
79 for (
auto id :
o2::mch::constants::deIdsForAllMCH) {
88static void patchPage(gsl::span<const std::byte> rdhBuffer,
bool verbose)
90 static int sNrdhs = 0;
91 auto& rdhAny = *
reinterpret_cast<RDH*
>(
const_cast<std::byte*
>(&(rdhBuffer[0])));
94 auto existingFeeId = o2::raw::RDHUtils::getFEEID(rdhAny);
95 if (existingFeeId == 0) {
98 auto cruId = o2::raw::RDHUtils::getCRUID(rdhAny) & 0xFF;
99 auto flags = o2::raw::RDHUtils::getCRUID(rdhAny) & 0xFF00;
100 auto endpoint = o2::raw::RDHUtils::getEndPointID(rdhAny);
106 std::cout <<
"RDH number " << sNrdhs <<
"--\n";
120 LOG(info) <<
"[initElec2DetMapper] filename=" <<
filename;
131 LOG(info) <<
"[initFee2SolarMapper] filename=" <<
filename;
144 mLoggingInterval = ic.
options().
get<
int>(
"logging-interval") * 1000;
146 auto mapCRUfile = ic.
options().
get<std::string>(
"cru-map");
147 auto mapFECfile = ic.
options().
get<std::string>(
"fec-map");
150 auto stop = [
this]() {
152 LOG(info) <<
"time spent for decoding (ms): min=" << mTimeDecoderMin->count() <<
", max="
153 << mTimeDecoderMax->count() <<
", mean=" << mTimeDecoder.count() / mTFcount;
171 static int Nrdhs = 0;
176 auto tStart = std::chrono::high_resolution_clock::now();
178 auto heartBeatHandler = [&](
DsElecId dsElecId, uint8_t chip, uint32_t bunchCrossing) {
180 auto solar = dsElecId.
solarId();
184 LOGP(info,
"HeartBeat: {}-CHIP{}", s, chip);
187 mHBPackets.emplace_back(solar,
ds, chip, bunchCrossing);
191 auto solarId = dsElecId.
solarId();
192 auto dsId = dsElecId.
elinkId();
195 LOGP(info,
"Digit: {}-CH{}", s, (
int)channel);
202 auto errorHandler = [&](
DsElecId dsElecId, int8_t chip, uint32_t error) {
203 auto solarId = dsElecId.
solarId();
204 auto dsId = dsElecId.
elinkId();
209 patchPage(page, mDebug);
212 auto& rdhAny = *
reinterpret_cast<RDH*
>(
const_cast<std::byte*
>(&(page[0])));
214 LOGP(info,
"{}--", Nrdhs);
228 }
catch (std::exception& e) {
229 LOGP(error,
"{}", e.what());
237 LOGP(info,
"\n\n============================\nStart of new buffer");
240 size_t pageStart = 0;
242 RDH* rdh =
reinterpret_cast<RDH*
>(
const_cast<std::byte*
>(&(
buf[pageStart])));
244 auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
245 if (rdhHeaderSize != 64) {
248 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
250 gsl::span<const std::byte> page(
reinterpret_cast<const std::byte*
>(rdh), pageSize);
253 pageStart += pageSize;
262 auto& inputs = pc.
inputs();
266 auto tStart = std::chrono::high_resolution_clock::now();
267 size_t totPayloadSize = 0;
268 for (
auto it = parser.
begin(),
end = parser.
end(); it !=
end; ++it) {
269 auto const* raw = it.raw();
273 size_t payloadSize = it.size();
274 totPayloadSize += payloadSize;
276 gsl::span<const std::byte>
buffer(
reinterpret_cast<const std::byte*
>(raw),
sizeof(
RDH) + payloadSize);
279 auto tEnd = std::chrono::high_resolution_clock::now();
281 if (totPayloadSize > 0) {
282 std::chrono::duration<double, std::milli> elapsed = tEnd - tStart;
283 mTimeDecoder += elapsed;
284 if (!mTimeDecoderMin || (elapsed < mTimeDecoderMin)) {
285 mTimeDecoderMin = elapsed;
287 if (!mTimeDecoderMax || (elapsed > mTimeDecoderMax)) {
288 mTimeDecoderMax = elapsed;
300 auto const* raw = input.
payload;
303 gsl::span<const std::byte>
buffer(
reinterpret_cast<const std::byte*
>(raw), payloadSize);
310 static auto loggerStart = std::chrono::high_resolution_clock::now();
311 static auto loggerEnd = loggerStart;
312 static uint64_t nDigits = 0;
313 static uint64_t nTF = 0;
315 if (mLoggingInterval == 0) {
319 nDigits += mDigits.size();
322 loggerEnd = std::chrono::high_resolution_clock::now();
323 std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
324 if (loggerElapsed.count() > mLoggingInterval) {
325 LOGP(info,
"Processed {} digits in {} time frames", nDigits, nTF);
328 loggerStart = std::chrono::high_resolution_clock::now();
336 size =
vec.empty() ? 0 :
sizeof(*(
vec.begin())) *
vec.size();
342 size_t sizeofElement =
sizeof(*(
vec.begin()));
343 for (
auto& element :
vec) {
344 memcpy(p, &element, sizeofElement);
353 for (
auto&& input : pc.
inputs()) {
354 if (input.spec->binding ==
"TF") {
357 if (input.spec->binding ==
"readout") {
372 auto freefct = [](
void*
data,
void*) { free(
data); };
383 std::vector<o2::mch::calibration::PedestalDigit> mDigits;
384 std::vector<o2::mch::HeartBeatPacket> mHBPackets;
385 std::vector<o2::mch::DecoderError> mErrors;
389 std::string mMapCRUfile;
390 std::string mMapFECfile;
392 std::string mInputSpec;
393 bool mDebug = {
false};
394 int mLoggingInterval = {0};
396 std::chrono::duration<double, std::milli> mTimeDecoder{};
397 std::optional<std::chrono::duration<double, std::milli>> mTimeDecoderMin{};
398 std::optional<std::chrono::duration<double, std::milli>> mTimeDecoderMax{};
410void customize(std::vector<ConfigParamSpec>& workflowOptions)
412 workflowOptions.push_back(
ConfigParamSpec{
"input-spec", VariantType::String,
"TF:MCH/RAWDATA", {
"selection string for the input data"}});
413 workflowOptions.push_back(
ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings"}});
430 AlgorithmSpec{adaptFromTask<o2::mch::raw::PedestalsTask>(inputSpec)},
431 Options{{
"mch-debug", VariantType::Bool,
false, {
"enable verbose output"}},
432 {
"logging-interval", VariantType::Int, 0, {
"time interval in seconds between logging messages (set to zero to disable)"}},
433 {
"noise-threshold", VariantType::Float, (float)2.0, {
"maximum acceptable noise value"}},
434 {
"pedestal-threshold", VariantType::Float, (float)150, {
"maximum acceptable pedestal value"}},
435 {
"cru-map", VariantType::String,
"", {
"custom CRU mapping"}},
436 {
"fec-map", VariantType::String,
"", {
"custom FEC mapping"}}}};
441 auto inputSpec = config.
options().
get<std::string>(
"input-spec");
447 specs.push_back(producer);
A raw page parser for DPL input.
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
T get(const char *key) const
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
MCH decoder error implementation.
"Fat" digit for pedestal data.
constexpr uint8_t elinkId() const
constexpr uint16_t solarId() const
solarId is an identifier that uniquely identify a solar board
PedestalsTask(std::string spec)
void initElec2DetMapper(std::string filename)
void init(framework::InitContext &ic)
void decodeTF(framework::ProcessingContext &pc)
void run(framework::ProcessingContext &pc)
void decodeReadout(const o2::framework::DataRef &input)
void decodeBuffer(gsl::span< const std::byte > buf)
void initFee2SolarMapper(std::string filename)
void decodePage(gsl::span< const std::byte > page)
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLenum GLuint GLenum GLsizei const GLchar * buf
constexpr o2::header::DataOrigin gDataOriginMCH
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
std::function< std::optional< DsDetId >(DsElecId)> createElec2DetMapper< ElectronicMapperGenerated >(uint64_t)
FeeLink2SolarMapper createFeeLink2SolarMapper< ElectronicMapperString >()
std::function< std::optional< DsDetId >(DsElecId)> Elec2DetMapper
std::function< std::optional< uint16_t >(FeeLinkId)> createFeeLink2SolarMapper< ElectronicMapperGenerated >()
std::function< void(Page buffer)> PageDecoder
Elec2DetMapper createElec2DetMapper< ElectronicMapperString >(uint64_t)
std::function< void(DsElecId dsId, DualSampaChannelId channel, SampaCluster)> SampaChannelHandler
std::function< std::optional< uint16_t >(FeeLinkId id)> FeeLink2SolarMapper
From (feeId,linkId) to solarId.
PageDecoder createPageDecoder(RawBuffer rdhBuffer, DecodedDataHandlers decodedDataHandlers)
will be called for each decoded Sampa packet and in case of decoding errors
std::string asString(const SampaCluster &sc)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
WorkflowSpec defineDataProcessing(const ConfigContext &config)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
o2::framework::DataProcessorSpec getMCHPedestalDecodingSpec(const char *specName, std::string inputSpec)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
SampaChannelHandler sampaChannelHandler
SampaErrorHandler sampaErrorHandler
SampaHeartBeatHandler sampaHeartBeatHandler
static std::string sCruMap
static std::string sFecMap
Piece of data for one Sampa channel.
static void printRDH(const RDHv4 &rdh)
static void setFEEID(RDHv4 &rdh, uint16_t v)
static constexpr int getVersion()
get numeric version of the RDH
std::vector< std::byte > createBuffer(gsl::span< std::string > data, uint32_t orbit=12345, uint16_t bc=678)
std::vector< o2::ctf::BufferType > vec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"