14#ifndef O2_FITDATAREADERDPLSPEC_H
15#define O2_FITDATAREADERDPLSPEC_H
42template <
typename RawReaderType>
57 auto ccdbUrl = ic.options().get<std::string>(
"ccdb-path");
58 auto lutPath = ic.options().get<std::string>(
"lut-path");
59 mDumpMetrics = ic.options().get<
bool>(
"dump-raw-data-metric");
60 if (!ic.options().get<
bool>(
"disable-empty-tf-protection")) {
64 RawReader_t::LookupTable_t::setCCDBurl(
ccdbUrl);
67 RawReader_t::LookupTable_t::setLUTpath(lutPath);
70 RawReader_t::LookupTable_t::Instance().printFullMap();
72 const auto nReserveVecDig = ic.options().get<
int>(
"reserve-vec-dig");
73 const auto nReserveVecChData = ic.options().get<
int>(
"reserve-vec-chdata");
74 const auto nReserveVecBuffer = ic.options().get<
int>(
"reserve-vec-buffer");
75 const auto nReserveMapDig = ic.options().get<
int>(
"reserve-map-dig");
76 if (nReserveVecDig || nReserveVecChData) {
77 mRawReader.reserveVecDPL(nReserveVecDig, nReserveVecChData);
79 if (nReserveVecBuffer || nReserveMapDig) {
80 mRawReader.reserve(nReserveVecBuffer, nReserveMapDig);
88 static size_t contDeadBeef = 0;
91 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
93 if (payloadSize == 0) {
95 if (++contDeadBeef <= maxWarn) {
96 LOGP(alarm,
"Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
97 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
98 contDeadBeef == maxWarn ? fmt::format(
". {} such inputs in row received, stopping reporting", contDeadBeef) :
"");
108 std::size_t cntDF0{0};
109 std::size_t cntDF2{0};
110 std::size_t cntDF_unknown{0};
111 auto start = std::chrono::high_resolution_clock::now();
113 pc.inputs().get<
typename RawReader_t::LookupTable_t::Table_t*>(
"channel_map");
115 auto stop = std::chrono::high_resolution_clock::now();
116 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
stop -
start);
117 LOG(
debug) <<
"Channel map upload delay: " << duration.count();
119 for (
auto it = parser.
begin(),
end = parser.
end(); it !=
end; ++it) {
124 gsl::span<const uint8_t> payload(it.data(), it.size());
125 const auto rdhDataFormat = o2::raw::RDHUtils::getDataFormat(rdhPtr);
126 if (rdhDataFormat == 0) {
128 mRawReader.process(
true, payload, o2::raw::RDHUtils::getFEEID(rdhPtr), o2::raw::RDHUtils::getLinkID(rdhPtr), o2::raw::RDHUtils::getEndPointID(rdhPtr));
129 }
else if (rdhDataFormat == 2) {
131 mRawReader.process(
false, payload, o2::raw::RDHUtils::getFEEID(rdhPtr), o2::raw::RDHUtils::getLinkID(rdhPtr), o2::raw::RDHUtils::getEndPointID(rdhPtr));
136 }
catch (std::exception& e) {
137 LOG(error) <<
"Failed to extract RDH, abandoning TF sending dummy output, exception was: " << e.what();
149 if ((cntDF0 > 0 && cntDF2 > 0) || cntDF_unknown > 0) {
150 LOG(error) <<
"Strange RDH::dataFormat in TF. Number of pages: DF=0 - " << cntDF0 <<
" , DF=2 - " << cntDF2 <<
" , DF=unknown - " << cntDF_unknown;
152 auto stop = std::chrono::high_resolution_clock::now();
153 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
stop -
start);
154 LOG(
debug) <<
"TF delay: " << duration.count();
158 LOG(info) <<
"finaliseCCDB";
160 LOG(
debug) <<
"Channel map is updated";
161 RawReader_t::LookupTable_t::Instance((
const typename RawReader_t::LookupTable_t::Table_t*)obj);
167template <
typename RawReaderType>
170 std::vector<OutputSpec> outputSpec;
171 std::vector<InputSpec> inputSpec{};
172 ConcreteDataMatcher matcherChMapCCDB{rawReader.mDataOrigin, RawReaderType::LookupTable_t::sObjectName, 0};
173 rawReader.configureOutputSpec(outputSpec);
175 inputSpec.push_back({
"STF",
ConcreteDataTypeMatcher{rawReader.mDataOrigin,
"SUB_RAWDATA"}, Lifetime::Sporadic});
181 inputSpec.emplace_back(
"STFDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
183 const bool updateCCDB = !disableDplCcdbFetcher;
185 inputSpec.emplace_back(
"channel_map", matcherChMapCCDB, Lifetime::Condition,
ccdbParamSpec(RawReaderType::LookupTable_t::sDefaultLUTpath));
187 std::string dataProcName = rawReader.mDataOrigin.template as<std::string>();
188 std::for_each(dataProcName.begin(), dataProcName.end(), [](
char&
c) { c = ::tolower(c); });
189 dataProcName +=
"-datareader-dpl";
190 LOG(info) << dataProcName;
195 adaptFromTask<FITDataReaderDPLSpec<RawReaderType>>(rawReader, matcherChMapCCDB, isSubSampled, updateCCDB),
203 o2::framework::ConfigParamSpec{
"disable-empty-tf-protection", VariantType::Bool,
false, {
"Disable empty TF protection. In case of empty payload within TF, only dummy ChannelData object will be sent."}}}};
A raw page parser for DPL input.
Type wrappers for enfording a specific serialization method.
static const VerbosityConfig & Instance()
RawReaderType RawReader_t
ConcreteDataMatcher mMatcherChMapCCDB
FITDataReaderDPLSpec(const RawReaderType &rawReader, const ConcreteDataMatcher &matcherChMapCCDB, bool isSampledRawData, bool updateCCDB)
~FITDataReaderDPLSpec() override=default
void init(InitContext &ic) final
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
void run(ProcessingContext &pc) final
FITDataReaderDPLSpec()=delete
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
virtual void stop()
This is invoked on stop.
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
constexpr o2::header::DataDescription gDataDescriptionRawData
framework::DataProcessorSpec getFITDataReaderDPLSpec(const RawReaderType &rawReader, bool askSTFDist, bool isSubSampled, bool disableDplCcdbFetcher)
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
TFitResultPtr fit(const size_t nBins, const T *arr, const T xMin, const T xMax, TF1 &func, std::string_view option="")
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"