Project
Loading...
Searching...
No Matches
CMVToVectorSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#include <limits>
17#include <memory>
18#include <stdexcept>
19#include <vector>
20#include <string>
21#include <fstream>
22#include <algorithm>
23#include <fmt/format.h>
24#include <fmt/chrono.h>
25
26#include "TFile.h"
28#include "Framework/Task.h"
30#include "Framework/Logger.h"
35#include "DPLUtils/RawParser.h"
36#include "Headers/DataHeader.h"
39
40#include "DataFormatsTPC/CMV.h"
42#include "TPCBase/RDHUtils.h"
43#include "TPCBase/Mapper.h"
45
46using namespace o2::framework;
50
51namespace o2::tpc
52{
53
55{
56 public:
58 CMVToVectorDevice(const std::vector<uint32_t>& crus) : mCRUs(crus) {}
59
61 {
62 // set up ADC value filling
63 mWriteDebug = ic.options().get<bool>("write-debug");
64 mWriteDebugOnError = ic.options().get<bool>("write-debug-on-error");
65 mWriteRawDataOnError = ic.options().get<bool>("write-raw-data-on-error");
66 mRawDataType = ic.options().get<int>("raw-data-type");
67 o2::framework::RawParser<>::setCheckIncompleteHBF(ic.options().get<bool>("check-incomplete-hbf"));
68
69 mDebugStreamFileName = ic.options().get<std::string>("debug-file-name").data();
70 mRawOutputFileName = ic.options().get<std::string>("raw-file-name").data();
71
72 initCMV();
73 }
74
76 {
77 const auto runNumber = processing_helpers::getRunNumber(pc);
78 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
79
80 // open files if necessary
81 if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) {
82 const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg("run", runNumber));
83 LOGP(info, "Creating debug stream {}", debugFileName);
84 mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(debugFileName.data(), "recreate");
85 }
86
87 if (mWriteRawDataOnError && !mRawOutputFile.is_open()) {
88 std::string_view rawType = (mRawDataType < 2) ? "tf" : "raw";
89 if (mRawDataType == 5) {
90 rawType = "cmv.raw";
91 }
92 const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg("run", runNumber), fmt::arg("raw_type", rawType));
93 LOGP(info, "Creating raw debug file {}", rawFileName);
94 mRawOutputFile.open(rawFileName, std::ios::binary);
95 }
96
97 uint32_t tfCounter = 0;
98 bool hasErrors = false;
99
100 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
101 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
102 tfCounter = dh->tfCounter;
103 const auto subSpecification = dh->subSpecification;
104 auto payloadSize = DataRefUtils::getPayloadSize(ref);
105 LOGP(debug, "Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize);
106
107 // ---| data loop |---
108 const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(ref);
109 try {
110 o2::framework::RawParser parser(raw.data(), raw.size());
111 size_t lastErrorCount = 0;
112
113 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
114 const auto size = it.size();
115
116 if (parser.getNErrors() > lastErrorCount) {
117 lastErrorCount = parser.getNErrors();
118 hasErrors = true;
119 }
120
121 // skip empty packages (HBF open)
122 if (size == 0) {
123 continue;
124 }
125
126 auto rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
127 const auto rdhVersion = RDHUtils::getVersion(rdhPtr);
128 if (!rdhPtr || rdhVersion < 6) {
129 throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data());
130 }
131
132 // ---| extract hardware information to do the processing |---
133 const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr);
134 const auto link = rdh_utils::getLink(feeId);
135 const uint32_t cruID = rdh_utils::getCRU(feeId);
136 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
137
138 LOGP(debug, "Detected CMV packet: CRU {}, link {}, feeId {}", cruID, link, feeId);
139
140 if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) {
141 LOGP(debug, "Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (decltype(detField))RawDataType::CMV, link, rdh_utils::CMVLinkID);
142 continue;
143 }
144
145 LOGP(debug, "Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link);
146
147 if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) {
148 // LOGP(debug, "CMV CRU {:3} not configured in CRUs, skipping", cruID);
149 continue;
150 }
151
152 auto& cmvVec = mCMVvectors[cruID];
153 auto& infoVec = mCMVInfos[cruID];
154
155 if (size != sizeof(cmv::Container)) {
156 LOGP(warning, "CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.", size, sizeof(cmv::Container));
157 hasErrors = true;
158 continue;
159 }
160 auto data = it.data();
161 auto& cmvs = *((cmv::Container*)(data));
162 const uint32_t orbit = cmvs.header.heartbeatOrbit;
163 const uint16_t bc = cmvs.header.heartbeatBC;
164
165 // record packet meta and append its CMV vector (3564 TB)
166 infoVec.emplace_back(orbit, bc);
167 cmvVec.reserve(cmvVec.size() + cmv::NTimeBinsPerPacket);
168 for (uint32_t tb = 0; tb < cmv::NTimeBinsPerPacket; ++tb) {
169 cmvVec.push_back(cmvs.getCMV(tb));
170 // LOGP(debug, "For CRU {}, timebin {}, orbit {}, bc {}, appended CMV {} float: {}", cruID, tb, orbit, bc, cmvs.getCMV(tb), cmvs.getCMVFloat(tb));
171 }
172 }
173 } catch (const std::exception& e) {
174 // error message throtteling
175 using namespace std::literals::chrono_literals;
176 static std::unordered_map<uint32_t, size_t> nErrorPerSubspec;
177 static std::chrono::time_point<std::chrono::steady_clock> lastReport = std::chrono::steady_clock::now();
178 const auto now = std::chrono::steady_clock::now();
179 static size_t reportedErrors = 0;
180 const size_t MAXERRORS = 10;
181 const auto sleepTime = 10min;
182 ++nErrorPerSubspec[subSpecification];
183
184 if ((now - lastReport) < sleepTime) {
185 if (reportedErrors < MAXERRORS) {
186 ++reportedErrors;
187 std::string sleepInfo;
188 if (reportedErrors == MAXERRORS) {
189 sleepInfo = fmt::format(", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS, sleepTime);
190 }
191 LOGP(alarm, "EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts,
192 dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo);
193 lastReport = now;
194 }
195 } else {
196 lastReport = now;
197 reportedErrors = 0;
198 }
199 continue;
200 }
201 }
202
203 hasErrors |= snapshotCMVs(pc.outputs());
204
205 if (mWriteDebug || (mWriteDebugOnError && hasErrors)) {
206 writeDebugOutput(tfCounter);
207 }
208
209 if (mWriteRawDataOnError && hasErrors) {
210 writeRawData(pc.inputs());
211 }
212
213 // clear output
214 initCMV();
215 }
216
218 {
219 LOGP(info, "closeFiles");
220
221 if (mDebugStream) {
222 // set some default aliases
223 auto& stream = (*mDebugStream) << "cmvs";
224 auto& tree = stream.getTree();
225 tree.SetAlias("sector", "int(cru/10)");
226 mDebugStream->Close();
227 mDebugStream.reset(nullptr);
228 mRawOutputFile.close();
229 }
230 }
231
232 void stop() final
233 {
234 LOGP(info, "stop");
235 closeFiles();
236 }
237
239 {
240 LOGP(info, "endOfStream");
241 // ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
242 closeFiles();
243 }
244
245 private:
247 struct CMVInfo {
248 CMVInfo() = default;
249 CMVInfo(const CMVInfo&) = default;
250 CMVInfo(uint32_t orbit, uint16_t bc) : heartbeatOrbit(orbit), heartbeatBC(bc) {}
251
252 uint32_t heartbeatOrbit{0};
253 uint16_t heartbeatBC{0};
254
255 bool operator==(const uint32_t orbit) const { return (heartbeatOrbit == orbit); }
256 bool operator==(const CMVInfo& inf) const { return (inf.heartbeatOrbit == heartbeatOrbit) && (inf.heartbeatBC == heartbeatBC); }
257 bool matches(uint32_t orbit, int16_t bc) const { return ((heartbeatOrbit == orbit) && (heartbeatBC == bc)); }
258 };
259
260 int mRawDataType{0};
261 bool mWriteDebug{false};
262 bool mWriteDebugOnError{false};
263 bool mWriteRawDataOnError{false};
264 std::vector<uint32_t> mCRUs;
265 std::unordered_map<uint32_t, std::vector<uint16_t>> mCMVvectors;
266 std::unordered_map<uint32_t, std::vector<CMVInfo>> mCMVInfos;
267 std::string mDebugStreamFileName;
268 std::unique_ptr<o2::utils::TreeStreamRedirector> mDebugStream;
269 std::ofstream mRawOutputFile;
270 std::string mRawOutputFileName;
271
272 //____________________________________________________________________________
273 bool snapshotCMVs(DataAllocator& output)
274 {
275 bool hasErrors = false;
276
277 // send data per CRU with its own orbit/BC vector
278 for (auto& [cru, cmvVec] : mCMVvectors) {
279 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
280 const auto& infVec = mCMVInfos[cru];
281
282 if (infVec.size() != 4) {
283 // LOGP(error, "CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size());
284 hasErrors = true;
285 }
286 if (cmvVec.size() != cmv::NTimeBinsPerPacket * infVec.size()) {
287 // LOGP(error, "CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBinsPerPacket * infVec.size());
288 hasErrors = true;
289 }
290
291 std::vector<uint64_t> orbitBCInfo;
292 orbitBCInfo.reserve(infVec.size());
293 for (const auto& inf : infVec) {
294 orbitBCInfo.emplace_back((uint64_t(inf.heartbeatOrbit) << 32) + uint64_t(inf.heartbeatBC));
295 }
296
297 LOGP(debug, "Sending CMVs for CRU {} of size {} ({} packets)", cru, cmvVec.size(), infVec.size());
298 output.snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec);
299 output.snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCInfo);
300 }
301
302 return hasErrors;
303 }
304
305 //____________________________________________________________________________
306 void initCMV()
307 {
308 for (const auto cruID : mCRUs) {
309 auto& cmvVec = mCMVvectors[cruID];
310 cmvVec.clear();
311
312 auto& infosCRU = mCMVInfos[cruID];
313 infosCRU.clear();
314 }
315 }
316
317 //____________________________________________________________________________
318 void writeDebugOutput(uint32_t tfCounter)
319 {
320 mDebugStream->GetFile()->cd();
321 auto& stream = (*mDebugStream) << "cmvs";
322
323 for (auto cru : mCRUs) {
324 if (mCMVInfos.find(cru) == mCMVInfos.end()) {
325 continue;
326 }
327
328 auto& infos = mCMVInfos[cru];
329 auto& cmvVec = mCMVvectors[cru];
330
331 stream << "cru=" << cru
332 << "tfCounter=" << tfCounter
333 << "nCMVs=" << cmvVec.size()
334 << "cmvs=" << cmvVec
335 << "\n";
336 }
337 }
338
339 void writeRawData(InputRecord& inputs)
340 {
341 if (!mRawOutputFile.is_open()) {
342 return;
343 }
344
346
347 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
348 for (auto const& ref : InputRecordWalker(inputs, filter)) {
349 auto dh = DataRefUtils::getHeader<header::DataHeader*>(ref);
350 // LOGP(info, "write header: {}/{}/{}, payload size: {} / {}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize, ref.payloadSize);
351 if (((mRawDataType == 1) || (mRawDataType == 3)) && (dh->payloadSize == 2 * sizeof(o2::header::RAWDataHeader))) {
352 continue;
353 }
354
355 if (mRawDataType < 2) {
356 mRawOutputFile.write(ref.header, sizeof(DataHeader));
357 }
358 if (mRawDataType < 5) {
359 mRawOutputFile.write(ref.payload, ref.payloadSize);
360 }
361
362 if (mRawDataType == 5) {
363 const gsl::span<const char> raw = inputs.get<gsl::span<char>>(ref);
364 try {
365 o2::framework::RawParser parser(raw.data(), raw.size());
366 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
367 const auto size = it.size();
368 // skip empty packages (HBF open)
369 if (size == 0) {
370 continue;
371 }
372
373 auto rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
374 const auto rdhVersion = RDHUtils::getVersion(rdhPtr);
375 if (!rdhPtr || rdhVersion < 6) {
376 throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data());
377 }
378
379 // ---| extract hardware information to do the processing |---
380 const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr);
381 const auto link = rdh_utils::getLink(feeId);
382 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
383
384 // only select CMVs
385 if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) {
386 continue;
387 }
388
389 // write out raw data
390 mRawOutputFile.write((const char*)it.raw(), RDHUtils::getMemorySize(rdhPtr));
391 }
392 } catch (...) {
393 }
394 }
395 }
396 }
397};
398
399o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector<uint32_t> const& crus)
400{
401 using device = o2::tpc::CMVToVectorDevice;
402
403 std::vector<OutputSpec> outputs;
404 for (const uint32_t cru : crus) {
405 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
406 outputs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe);
407 outputs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe);
408 }
409
410 return DataProcessorSpec{
411 fmt::format("tpc-cmv-to-vector"),
412 select(inputSpec.data()),
413 outputs,
414 AlgorithmSpec{adaptFromTask<device>(crus)},
415 Options{
416 {"write-debug", VariantType::Bool, false, {"write a debug output tree"}},
417 {"write-debug-on-error", VariantType::Bool, false, {"write a debug output tree in case errors occurred"}},
418 {"debug-file-name", VariantType::String, "/tmp/cmv_vector_debug.{run}.root", {"name of the debug output file"}},
419 {"write-raw-data-on-error", VariantType::Bool, false, {"dump raw data in case errors occurred"}},
420 {"raw-file-name", VariantType::String, "/tmp/cmv_debug.{run}.{raw_type}", {"name of the raw output file"}},
421 {"raw-data-type", VariantType::Int, 0, {"Which raw data to dump: 0-full TPC with DH, 1-full TPC with DH skip empty, 2-full TPC no DH, 3-full TPC no DH skip empty, 4-IDC raw only 5-CMV raw only"}},
422 {"check-incomplete-hbf", VariantType::Bool, false, {"false: don't check; true: check and report"}},
423 } // end Options
424 }; // end DataProcessorSpec
425}
426} // namespace o2::tpc
Common mode values data format definition.
std::ostringstream debug
uint64_t orbit
Definition RawEventData.h:6
uint64_t bc
Definition RawEventData.h:5
o2::raw::RawFileWriter * raw
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Generic parser for consecutive raw pages.
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
size_t getNErrors() const
Definition RawParser.h:642
const_iterator begin() const
Definition RawParser.h:618
const_iterator end() const
Definition RawParser.h:623
static void setCheckIncompleteHBF(bool v)
Definition RawParser.h:647
rdh_utils::FEEIDType FEEIDType
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void stop() final
This is invoked on stop.
void run(o2::framework::ProcessingContext &pc) final
CMVToVectorDevice(const std::vector< uint32_t > &crus)
void init(o2::framework::InitContext &ic) final
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
bool operator==(const DsChannelId &a, const DsChannelId &b)
Definition DsChannelId.h:65
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
uint16_t FEEIDType
Definition RDHUtils.h:26
Global TPC definitions and constants.
Definition SimTraits.h:168
o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const &inputSpec, std::vector< uint32_t > const &crus)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
the main header struct
Definition DataHeader.h:620
uint32_t SubSpecificationType
Definition DataHeader.h:622
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:60
constexpr size_t min
const int sleepTime
Definition test_Fifo.cxx:28
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))