Project
Loading...
Searching...
No Matches
FITDataReaderDPLSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#ifndef O2_FITDATAREADERDPLSPEC_H
15#define O2_FITDATAREADERDPLSPEC_H
17#include "Framework/Task.h"
22#include "Framework/Lifetime.h"
23#include "Framework/Output.h"
29#include <string>
30#include <iostream>
31#include <vector>
32#include <gsl/span>
33#include <chrono>
35
36using namespace o2::framework;
37
38namespace o2
39{
40namespace fit
41{
42template <typename RawReaderType>
44{
45 public:
46 FITDataReaderDPLSpec(const RawReaderType& rawReader, const ConcreteDataMatcher& matcherChMapCCDB, bool isSampledRawData, bool updateCCDB) : mRawReader(rawReader), mMatcherChMapCCDB(matcherChMapCCDB), mIsSampledRawData(isSampledRawData), mUpdateCCDB(updateCCDB) {}
48 ~FITDataReaderDPLSpec() override = default;
49 typedef RawReaderType RawReader_t;
51 ConcreteDataMatcher mMatcherChMapCCDB; // matcher for Channel map(LUT) from CCDB
53 bool mUpdateCCDB{true};
54 bool mDumpMetrics{false};
55 void init(InitContext& ic) final
56 {
57 auto ccdbUrl = ic.options().get<std::string>("ccdb-path");
58 auto lutPath = ic.options().get<std::string>("lut-path");
59 mDumpMetrics = ic.options().get<bool>("dump-raw-data-metric");
60 if (!ic.options().get<bool>("disable-empty-tf-protection")) {
61 mRawReader.enableEmptyTFprotection();
62 }
63 if (ccdbUrl != "") {
64 RawReader_t::LookupTable_t::setCCDBurl(ccdbUrl);
65 }
66 if (lutPath != "") {
67 RawReader_t::LookupTable_t::setLUTpath(lutPath);
68 }
69 if (!mUpdateCCDB) {
70 RawReader_t::LookupTable_t::Instance().printFullMap();
71 }
72 const auto nReserveVecDig = ic.options().get<int>("reserve-vec-dig");
73 const auto nReserveVecChData = ic.options().get<int>("reserve-vec-chdata");
74 const auto nReserveVecBuffer = ic.options().get<int>("reserve-vec-buffer");
75 const auto nReserveMapDig = ic.options().get<int>("reserve-map-dig");
76 if (nReserveVecDig || nReserveVecChData) {
77 mRawReader.reserveVecDPL(nReserveVecDig, nReserveVecChData);
78 }
79 if (nReserveVecBuffer || nReserveMapDig) {
80 mRawReader.reserve(nReserveVecBuffer, nReserveMapDig);
81 }
82 }
83 void run(ProcessingContext& pc) final
84 {
85 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
86 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
87 if (!mIsSampledRawData) { // do not check 0xDEADBEEF if raw data is sampled
88 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
89 std::vector<InputSpec> dummy{InputSpec{"dummy", ConcreteDataMatcher{mRawReader.mDataOrigin, o2::header::gDataDescriptionRawData, 0xDEADBEEF}}};
90 for (const auto& ref : InputRecordWalker(pc.inputs(), dummy)) {
91 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
92 auto payloadSize = DataRefUtils::getPayloadSize(ref);
93 if (payloadSize == 0) {
95 if (++contDeadBeef <= maxWarn) {
96 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
97 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
98 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
99 }
100 mRawReader.makeSnapshot(pc); // send empty output
101 return;
102 }
103 }
104 contDeadBeef = 0; // if good data, reset the counter
105 }
106 std::vector<InputSpec> filter{InputSpec{"filter", ConcreteDataTypeMatcher{mRawReader.mDataOrigin, mIsSampledRawData ? "SUB_RAWDATA" : o2::header::gDataDescriptionRawData}, Lifetime::Timeframe}};
107 DPLRawParser parser(pc.inputs(), filter);
108 std::size_t cntDF0{0}; // number of pages with DataFormat=0, padded
109 std::size_t cntDF2{0}; // number of pages with DataFormat=2, no padding
110 std::size_t cntDF_unknown{0}; // number of pages with unknown DataFormat
111 auto start = std::chrono::high_resolution_clock::now();
112 if (mUpdateCCDB) {
113 pc.inputs().get<typename RawReader_t::LookupTable_t::Table_t*>("channel_map");
114 mUpdateCCDB = false;
115 auto stop = std::chrono::high_resolution_clock::now();
116 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
117 LOG(debug) << "Channel map upload delay: " << duration.count();
118 }
119 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
120 const o2::header::RDHAny* rdhPtr = nullptr;
121 // Proccessing each page
122 try {
123 rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
124 gsl::span<const uint8_t> payload(it.data(), it.size());
125 const auto rdhDataFormat = o2::raw::RDHUtils::getDataFormat(rdhPtr);
126 if (rdhDataFormat == 0) { // padded
127 cntDF0++;
128 mRawReader.process(true, payload, o2::raw::RDHUtils::getFEEID(rdhPtr), o2::raw::RDHUtils::getLinkID(rdhPtr), o2::raw::RDHUtils::getEndPointID(rdhPtr));
129 } else if (rdhDataFormat == 2) { // no padding
130 cntDF2++;
131 mRawReader.process(false, payload, o2::raw::RDHUtils::getFEEID(rdhPtr), o2::raw::RDHUtils::getLinkID(rdhPtr), o2::raw::RDHUtils::getEndPointID(rdhPtr));
132 } else {
133 cntDF_unknown++;
134 continue; // or break?
135 }
136 } catch (std::exception& e) {
137 LOG(error) << "Failed to extract RDH, abandoning TF sending dummy output, exception was: " << e.what();
138 mRawReader.makeSnapshot(pc); // send empty output
139 return;
140 }
141 }
142 mRawReader.accumulateDigits();
143 mRawReader.emptyTFprotection();
144 mRawReader.makeSnapshot(pc);
145 if (mDumpMetrics) {
146 mRawReader.dumpRawDataMetrics();
147 }
148 mRawReader.clear();
149 if ((cntDF0 > 0 && cntDF2 > 0) || cntDF_unknown > 0) {
150 LOG(error) << "Strange RDH::dataFormat in TF. Number of pages: DF=0 - " << cntDF0 << " , DF=2 - " << cntDF2 << " , DF=unknown - " << cntDF_unknown;
151 }
152 auto stop = std::chrono::high_resolution_clock::now();
153 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
154 LOG(debug) << "TF delay: " << duration.count();
155 }
156 void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
157 {
158 LOG(info) << "finaliseCCDB";
159 if (matcher == mMatcherChMapCCDB) {
160 LOG(debug) << "Channel map is updated";
161 RawReader_t::LookupTable_t::Instance((const typename RawReader_t::LookupTable_t::Table_t*)obj);
162 return;
163 }
164 }
165};
166
167template <typename RawReaderType>
168framework::DataProcessorSpec getFITDataReaderDPLSpec(const RawReaderType& rawReader, bool askSTFDist, bool isSubSampled, bool disableDplCcdbFetcher)
169{
170 std::vector<OutputSpec> outputSpec;
171 std::vector<InputSpec> inputSpec{};
172 ConcreteDataMatcher matcherChMapCCDB{rawReader.mDataOrigin, RawReaderType::LookupTable_t::sObjectName, 0};
173 rawReader.configureOutputSpec(outputSpec);
174 if (isSubSampled) {
175 inputSpec.push_back({"STF", ConcreteDataTypeMatcher{rawReader.mDataOrigin, "SUB_RAWDATA"}, Lifetime::Sporadic}); // in case if one need to use DataSampler
176 askSTFDist = false;
177 } else {
178 inputSpec.push_back({"STF", ConcreteDataTypeMatcher{rawReader.mDataOrigin, "RAWDATA"}, Lifetime::Timeframe});
179 }
180 if (askSTFDist) {
181 inputSpec.emplace_back("STFDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
182 }
183 const bool updateCCDB = !disableDplCcdbFetcher;
184 if (updateCCDB) {
185 inputSpec.emplace_back("channel_map", matcherChMapCCDB, Lifetime::Condition, ccdbParamSpec(RawReaderType::LookupTable_t::sDefaultLUTpath));
186 }
187 std::string dataProcName = rawReader.mDataOrigin.template as<std::string>();
188 std::for_each(dataProcName.begin(), dataProcName.end(), [](char& c) { c = ::tolower(c); });
189 dataProcName += "-datareader-dpl";
190 LOG(info) << dataProcName;
191 return DataProcessorSpec{
192 dataProcName,
193 inputSpec,
194 outputSpec,
195 adaptFromTask<FITDataReaderDPLSpec<RawReaderType>>(rawReader, matcherChMapCCDB, isSubSampled, updateCCDB),
196 {o2::framework::ConfigParamSpec{"ccdb-path", VariantType::String, "", {"CCDB url which contains LookupTable"}},
197 o2::framework::ConfigParamSpec{"lut-path", VariantType::String, "", {"LookupTable path, e.g. FT0/LookupTable"}},
198 o2::framework::ConfigParamSpec{"reserve-vec-dig", VariantType::Int, 0, {"Reserve memory for Digit vector, to DPL channel"}},
199 o2::framework::ConfigParamSpec{"reserve-vec-chdata", VariantType::Int, 0, {"Reserve memory for ChannelData vector, to DPL channel"}},
200 o2::framework::ConfigParamSpec{"reserve-vec-buffer", VariantType::Int, 0, {"Reserve memory for DataBlock vector, buffer for each page"}},
201 o2::framework::ConfigParamSpec{"reserve-map-dig", VariantType::Int, 0, {"Reserve memory for Digit map, mapping in RawReader"}},
202 o2::framework::ConfigParamSpec{"dump-raw-data-metric", VariantType::Bool, false, {"Dump raw data metrics, for debugging"}},
203 o2::framework::ConfigParamSpec{"disable-empty-tf-protection", VariantType::Bool, false, {"Disable empty TF protection. In case of empty payload within TF, only dummy ChannelData object will be sent."}}}};
204}
205
206} // namespace fit
207} // namespace o2
208
209#endif /* O2_FITDATAREADERDPL_H */
A raw page parser for DPL input.
A helper class to iteratate over all parts of all input routes.
uint32_t c
Definition RawData.h:2
Type wrappers for enfording a specific serialization method.
std::ostringstream debug
FITDataReaderDPLSpec(const RawReaderType &rawReader, const ConcreteDataMatcher &matcherChMapCCDB, bool isSampledRawData, bool updateCCDB)
~FITDataReaderDPLSpec() override=default
void init(InitContext &ic) final
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
void run(ProcessingContext &pc) final
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
A helper class to iteratate over all parts of all input routes.
virtual void stop()
This is invoked on stop.
Definition Task.h:53
GLuint GLuint end
Definition glcorearb.h:469
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint start
Definition glcorearb.h:469
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:597
framework::DataProcessorSpec getFITDataReaderDPLSpec(const RawReaderType &rawReader, bool askSTFDist, bool isSubSampled, bool disableDplCcdbFetcher)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
TFitResultPtr fit(const size_t nBins, const T *arr, const T xMin, const T xMax, TF1 &func, std::string_view option="")
Definition fit.h:59
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
std::string ccdbUrl
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"