Project
Loading...
Searching...
No Matches
CMVToVectorSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#include <limits>
17#include <memory>
18#include <stdexcept>
19#include <vector>
20#include <string>
21#include <fstream>
22#include <algorithm>
23#include <fmt/format.h>
24#include <fmt/chrono.h>
25
26#include "TFile.h"
28#include "Framework/Task.h"
30#include "Framework/Logger.h"
35#include "DPLUtils/RawParser.h"
36#include "Headers/DataHeader.h"
39
40#include "DataFormatsTPC/CMV.h"
42#include "TPCBase/RDHUtils.h"
43#include "TPCBase/Mapper.h"
45
46using namespace o2::framework;
50
51namespace o2::tpc
52{
53
55{
56 public:
58 CMVToVectorDevice(const std::vector<uint32_t>& crus) : mCRUs(crus) {}
59
61 {
62 // set up ADC value filling
63 mWriteDebug = ic.options().get<bool>("write-debug");
64 mWriteDebugOnError = ic.options().get<bool>("write-debug-on-error");
65 mWriteRawDataOnError = ic.options().get<bool>("write-raw-data-on-error");
66 mRawDataType = ic.options().get<int>("raw-data-type");
67 o2::framework::RawParser<>::setCheckIncompleteHBF(ic.options().get<bool>("check-incomplete-hbf"));
68
69 mDebugStreamFileName = ic.options().get<std::string>("debug-file-name").data();
70 mRawOutputFileName = ic.options().get<std::string>("raw-file-name").data();
71
72 initCMV();
73 }
74
76 {
77 const auto runNumber = processing_helpers::getRunNumber(pc);
78 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
79 const auto& mapper = Mapper::instance();
80
81 // open files if necessary
82 if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) {
83 const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg("run", runNumber));
84 LOGP(info, "Creating debug stream {}", debugFileName);
85 mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(debugFileName.data(), "recreate");
86 }
87
88 if (mWriteRawDataOnError && !mRawOutputFile.is_open()) {
89 std::string_view rawType = (mRawDataType < 2) ? "tf" : "raw";
90 if (mRawDataType == 5) {
91 rawType = "cmv.raw";
92 }
93 const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg("run", runNumber), fmt::arg("raw_type", rawType));
94 LOGP(info, "Creating raw debug file {}", rawFileName);
95 mRawOutputFile.open(rawFileName, std::ios::binary);
96 }
97
98 uint32_t heartbeatOrbit = 0;
99 uint16_t heartbeatBC = 0;
100 uint32_t tfCounter = 0;
101 bool first = true;
102 bool hasErrors = false;
103
104 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
105 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
106 tfCounter = dh->tfCounter;
107 const auto subSpecification = dh->subSpecification;
108 auto payloadSize = DataRefUtils::getPayloadSize(ref);
109 LOGP(debug, "Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize);
110
111 // ---| data loop |---
112 const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(ref);
113 try {
114 o2::framework::RawParser parser(raw.data(), raw.size());
115 size_t lastErrorCount = 0;
116
117 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
118 const auto size = it.size();
119
120 if (parser.getNErrors() > lastErrorCount) {
121 lastErrorCount = parser.getNErrors();
122 hasErrors = true;
123 }
124
125 // skip empty packages (HBF open)
126 if (size == 0) {
127 continue;
128 }
129
130 auto rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
131 const auto rdhVersion = RDHUtils::getVersion(rdhPtr);
132 if (!rdhPtr || rdhVersion < 6) {
133 throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data());
134 }
135
136 // ---| extract hardware information to do the processing |---
137 const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr);
138 const auto link = rdh_utils::getLink(feeId);
139 const uint32_t cruID = rdh_utils::getCRU(feeId);
140 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
141
142 LOGP(debug, "Detected CMV packet: CRU {}, link {}, feeId {}", cruID, link, feeId);
143
144 if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) {
145 LOGP(debug, "Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (decltype(detField))RawDataType::CMV, link, rdh_utils::CMVLinkID);
146 continue;
147 }
148
149 LOGP(debug, "Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link);
150
151 if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) {
152 LOGP(warning, "CMV CRU {:3} not configured in CRUs, skipping", cruID);
153 continue;
154 }
155
156 auto& cmvVec = mCMVvectors[cruID];
157 auto& infoVec = mCMVInfos[cruID];
158
159 if (size != sizeof(cmv::Container)) {
160 LOGP(warning, "CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.", size, sizeof(cmv::Container));
161 hasErrors = true;
162 continue;
163 }
164 auto data = it.data();
165 auto& cmvs = *((cmv::Container*)(data));
166 const uint32_t orbit = cmvs.header.heartbeatOrbit;
167 const uint16_t bc = cmvs.header.heartbeatBC;
168
169 // record packet meta and append its CMV vector (3564 TB)
170 infoVec.emplace_back(orbit, bc);
171 cmvVec.reserve(cmvVec.size() + cmv::NTimeBinsPerPacket);
172 for (uint32_t tb = 0; tb < cmv::NTimeBinsPerPacket; ++tb) {
173 cmvVec.push_back(cmvs.getCMV(tb));
174 // LOGP(debug, "Appended CMV {} for timebin {}, CRU {}, orbit {}, bc {}", cmvs.getCMV(tb), tb, cruID, orbit, bc);
175 }
176 }
177 } catch (const std::exception& e) {
178 // error message throtteling
179 using namespace std::literals::chrono_literals;
180 static std::unordered_map<uint32_t, size_t> nErrorPerSubspec;
181 static std::chrono::time_point<std::chrono::steady_clock> lastReport = std::chrono::steady_clock::now();
182 const auto now = std::chrono::steady_clock::now();
183 static size_t reportedErrors = 0;
184 const size_t MAXERRORS = 10;
185 const auto sleepTime = 10min;
186 ++nErrorPerSubspec[subSpecification];
187
188 if ((now - lastReport) < sleepTime) {
189 if (reportedErrors < MAXERRORS) {
190 ++reportedErrors;
191 std::string sleepInfo;
192 if (reportedErrors == MAXERRORS) {
193 sleepInfo = fmt::format(", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS, sleepTime);
194 }
195 LOGP(alarm, "EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts,
196 dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo);
197 lastReport = now;
198 }
199 } else {
200 lastReport = now;
201 reportedErrors = 0;
202 }
203 continue;
204 }
205 }
206
207 hasErrors |= snapshotCMVs(pc.outputs(), tfCounter);
208
209 if (mWriteDebug || (mWriteDebugOnError && hasErrors)) {
210 writeDebugOutput(tfCounter);
211 }
212
213 if (mWriteRawDataOnError && hasErrors) {
214 writeRawData(pc.inputs());
215 }
216
217 // clear output
218 initCMV();
219 }
220
222 {
223 LOGP(info, "closeFiles");
224
225 if (mDebugStream) {
226 // set some default aliases
227 auto& stream = (*mDebugStream) << "cmvs";
228 auto& tree = stream.getTree();
229 tree.SetAlias("sector", "int(cru/10)");
230 mDebugStream->Close();
231 mDebugStream.reset(nullptr);
232 mRawOutputFile.close();
233 }
234 }
235
236 void stop() final
237 {
238 LOGP(info, "stop");
239 closeFiles();
240 }
241
243 {
244 LOGP(info, "endOfStream");
245 // ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
246 closeFiles();
247 }
248
249 private:
251 struct CMVInfo {
252 CMVInfo() = default;
253 CMVInfo(const CMVInfo&) = default;
254 CMVInfo(uint32_t orbit, uint16_t bc) : heartbeatOrbit(orbit), heartbeatBC(bc) {}
255
256 uint32_t heartbeatOrbit{0};
257 uint16_t heartbeatBC{0};
258
259 bool operator==(const uint32_t orbit) const { return (heartbeatOrbit == orbit); }
260 bool operator==(const CMVInfo& inf) const { return (inf.heartbeatOrbit == heartbeatOrbit) && (inf.heartbeatBC == heartbeatBC); }
261 bool matches(uint32_t orbit, int16_t bc) const { return ((heartbeatOrbit == orbit) && (heartbeatBC == bc)); }
262 };
263
264 int mRawDataType{0};
265 bool mWriteDebug{false};
266 bool mWriteDebugOnError{false};
267 bool mWriteRawDataOnError{false};
268 std::vector<uint32_t> mCRUs;
269 std::unordered_map<uint32_t, std::vector<uint16_t>> mCMVvectors;
270 std::unordered_map<uint32_t, std::vector<CMVInfo>> mCMVInfos;
271 std::string mDebugStreamFileName;
272 std::unique_ptr<o2::utils::TreeStreamRedirector> mDebugStream;
273 std::ofstream mRawOutputFile;
274 std::string mRawOutputFileName;
275
276 //____________________________________________________________________________
277 bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter)
278 {
279 bool hasErrors = false;
280
281 // send data per CRU with its own orbit/BC vector
282 for (auto& [cru, cmvVec] : mCMVvectors) {
283 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
284 const auto& infVec = mCMVInfos[cru];
285
286 if (infVec.size() != 4) {
287 // LOGP(error, "CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size());
288 hasErrors = true;
289 }
290 if (cmvVec.size() != cmv::NTimeBinsPerPacket * infVec.size()) {
291 // LOGP(error, "CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBinsPerPacket * infVec.size());
292 hasErrors = true;
293 }
294
295 std::vector<uint64_t> orbitBCInfo;
296 orbitBCInfo.reserve(infVec.size());
297 for (const auto& inf : infVec) {
298 orbitBCInfo.emplace_back((uint64_t(inf.heartbeatOrbit) << 32) + uint64_t(inf.heartbeatBC));
299 }
300
301 LOGP(debug, "Sending CMVs for CRU {} of size {} ({} packets)", cru, cmvVec.size(), infVec.size());
302 output.snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec);
303 output.snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCInfo);
304 }
305
306 return hasErrors;
307 }
308
309 //____________________________________________________________________________
310 void initCMV()
311 {
312 for (const auto cruID : mCRUs) {
313 auto& cmvVec = mCMVvectors[cruID];
314 cmvVec.clear();
315
316 auto& infosCRU = mCMVInfos[cruID];
317 infosCRU.clear();
318 }
319 }
320
321 //____________________________________________________________________________
322 void writeDebugOutput(uint32_t tfCounter)
323 {
324 const auto& mapper = Mapper::instance();
325
326 mDebugStream->GetFile()->cd();
327 auto& stream = (*mDebugStream) << "cmvs";
328 uint32_t seen = 0;
329 static uint32_t firstOrbit = std::numeric_limits<uint32_t>::max();
330
331 for (auto cru : mCRUs) {
332 if (mCMVInfos.find(cru) == mCMVInfos.end()) {
333 continue;
334 }
335
336 auto& infos = mCMVInfos[cru];
337 auto& cmvVec = mCMVvectors[cru];
338
339 stream << "cru=" << cru
340 << "tfCounter=" << tfCounter
341 << "nCMVs=" << cmvVec.size()
342 << "cmvs=" << cmvVec
343 << "\n";
344 }
345 }
346
347 void writeRawData(InputRecord& inputs)
348 {
349 if (!mRawOutputFile.is_open()) {
350 return;
351 }
352
354
355 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
356 for (auto const& ref : InputRecordWalker(inputs, filter)) {
357 auto dh = DataRefUtils::getHeader<header::DataHeader*>(ref);
358 // LOGP(info, "write header: {}/{}/{}, payload size: {} / {}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize, ref.payloadSize);
359 if (((mRawDataType == 1) || (mRawDataType == 3)) && (dh->payloadSize == 2 * sizeof(o2::header::RAWDataHeader))) {
360 continue;
361 }
362
363 if (mRawDataType < 2) {
364 mRawOutputFile.write(ref.header, sizeof(DataHeader));
365 }
366 if (mRawDataType < 5) {
367 mRawOutputFile.write(ref.payload, ref.payloadSize);
368 }
369
370 if (mRawDataType == 5) {
371 const gsl::span<const char> raw = inputs.get<gsl::span<char>>(ref);
372 try {
373 o2::framework::RawParser parser(raw.data(), raw.size());
374 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
375 const auto size = it.size();
376 // skip empty packages (HBF open)
377 if (size == 0) {
378 continue;
379 }
380
381 auto rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
382 const auto rdhVersion = RDHUtils::getVersion(rdhPtr);
383 if (!rdhPtr || rdhVersion < 6) {
384 throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data());
385 }
386
387 // ---| extract hardware information to do the processing |---
388 const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr);
389 const auto link = rdh_utils::getLink(feeId);
390 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
391
392 // only select CMVs
393 if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) {
394 continue;
395 }
396
397 // write out raw data
398 mRawOutputFile.write((const char*)it.raw(), RDHUtils::getMemorySize(rdhPtr));
399 }
400 } catch (...) {
401 }
402 }
403 }
404 }
405};
406
407o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector<uint32_t> const& crus)
408{
409 using device = o2::tpc::CMVToVectorDevice;
410
411 std::vector<OutputSpec> outputs;
412 for (const uint32_t cru : crus) {
413 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
414 outputs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe);
415 outputs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe);
416 }
417
418 return DataProcessorSpec{
419 fmt::format("tpc-cmv-to-vector"),
420 select(inputSpec.data()),
421 outputs,
422 AlgorithmSpec{adaptFromTask<device>(crus)},
423 Options{
424 {"write-debug", VariantType::Bool, false, {"write a debug output tree"}},
425 {"write-debug-on-error", VariantType::Bool, false, {"write a debug output tree in case errors occurred"}},
426 {"debug-file-name", VariantType::String, "/tmp/cmv_vector_debug.{run}.root", {"name of the debug output file"}},
427 {"write-raw-data-on-error", VariantType::Bool, false, {"dump raw data in case errors occurred"}},
428 {"raw-file-name", VariantType::String, "/tmp/cmv_debug.{run}.{raw_type}", {"name of the raw output file"}},
429 {"raw-data-type", VariantType::Int, 0, {"Which raw data to dump: 0-full TPC with DH, 1-full TPC with DH skip empty, 2-full TPC no DH, 3-full TPC no DH skip empty, 4-IDC raw only 5-CMV raw only"}},
430 {"check-incomplete-hbf", VariantType::Bool, false, {"false: don't check; true: check and report"}},
431 } // end Options
432 }; // end DataProcessorSpec
433}
434} // namespace o2::tpc
Common mode values data format definition.
std::ostringstream debug
uint64_t orbit
Definition RawEventData.h:6
uint64_t bc
Definition RawEventData.h:5
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Generic parser for consecutive raw pages.
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
size_t getNErrors() const
Definition RawParser.h:642
const_iterator begin() const
Definition RawParser.h:618
const_iterator end() const
Definition RawParser.h:623
static void setCheckIncompleteHBF(bool v)
Definition RawParser.h:647
rdh_utils::FEEIDType FEEIDType
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void stop() final
This is invoked on stop.
void run(o2::framework::ProcessingContext &pc) final
CMVToVectorDevice(const std::vector< uint32_t > &crus)
void init(o2::framework::InitContext &ic) final
static Mapper & instance(const std::string mappingDir="")
Definition Mapper.h:44
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
bool operator==(const DsChannelId &a, const DsChannelId &b)
Definition DsChannelId.h:65
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
uint16_t FEEIDType
Definition RDHUtils.h:26
Global TPC definitions and constants.
Definition SimTraits.h:168
o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector< uint32_t > const &crus)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
the main header struct
Definition DataHeader.h:620
uint32_t SubSpecificationType
Definition DataHeader.h:622
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:60
constexpr size_t min
const int sleepTime
Definition test_Fifo.cxx:28
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))