Project
Loading...
Searching...
No Matches
DataDecoderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
18
19#include <random>
20#include <iostream>
21#include <fstream>
22#include <stdexcept>
23#include <array>
24#include <functional>
25#include <vector>
26
27#include "TTree.h"
28#include "TFile.h"
29
30#include <gsl/span>
31
36#include "Framework/Lifetime.h"
37#include "Framework/Output.h"
38#include "Framework/Task.h"
40#include "Framework/Logger.h"
43
47
50#include "HMPIDBase/Geo.h"
54
55namespace o2
56{
57namespace hmpid
58{
59
60using namespace o2;
61using namespace o2::framework;
63
64//=======================
65// Data decoder
67{
68
69 LOG(info) << "[HMPID Data Decoder - Init] ( create Raw Stream Decoder for " << Geo::MAXEQUIPMENTS << " equipments !";
70
71 mRootStatFile = ic.options().get<std::string>("result-file");
72 mFastAlgorithm = ic.options().get<bool>("fast-decode");
74 mDeco->init();
75 mTotalDigits = 0;
76 mTotalFrames = 0;
77
78 mExTimer.start();
79 return;
80}
81
83{
84 mDeco->mDigits.clear();
85 decodeTF(pc);
86 // TODO: accept other types of Raw Streams ...
87 // decodeReadout(pc);
88 // decodeRawFile(pc);
89
90 pc.outputs().snapshot(o2::framework::Output{"HMP", "DIGITS", 0}, mDeco->mDigits);
91 pc.outputs().snapshot(o2::framework::Output{"HMP", "INTRECORDS", 0}, mDeco->mIntReco);
92
93 LOG(debug) << "Writing Digitis=" << mDeco->mDigits.size() << "/" << mTotalDigits << " Frame=" << mTotalFrames << " IntRec " << mDeco->mIntReco;
94 mExTimer.elapseMes("Decoding... Digits decoded = " + std::to_string(mTotalDigits) + " Frames received = " + std::to_string(mTotalFrames));
95}
96
98{
99 // Records the statistics
100 float avgEventSize; //[o2::hmpid::Geo::MAXEQUIPMENTS];
101 float avgBusyTime; //[o2::hmpid::Geo::MAXEQUIPMENTS];
102 float numOfSamples; //[o2::hmpid::Geo::N_MODULES][o2::hmpid::Geo::N_YCOLS][o2::hmpid::Geo::N_XROWS];
103 float sumOfCharges; //[o2::hmpid::Geo::N_MODULES][o2::hmpid::Geo::N_YCOLS][o2::hmpid::Geo::N_XROWS];
104 float squareOfCharges; //[o2::hmpid::Geo::N_MODULES][o2::hmpid::Geo::N_YCOLS][o2::hmpid::Geo::N_XROWS];
105 float xb;
106 float yb;
107
108 TString filename = TString::Format("%s_stat.root", mRootStatFile.c_str());
109 LOG(info) << "Create the stat file " << filename.Data();
110 TFile mfileOut(TString::Format("%s", filename.Data()), "RECREATE");
111 TTree* theObj[Geo::N_MODULES + 1];
112 for (int i = 0; i < Geo::N_MODULES; i++) { // Create the TTree array
113 TString tit = TString::Format("HMPID Data Decoding Statistic results Mod=%d", i);
114 theObj[i] = new TTree("o2hmp", tit);
115 theObj[i]->Branch("x", &xb, "s");
116 theObj[i]->Branch("y", &yb, "s");
117 theObj[i]->Branch("Samples", &numOfSamples, "i");
118 theObj[i]->Branch("Sum_of_charges", &sumOfCharges, "l");
119 theObj[i]->Branch("Sum_of_square", &squareOfCharges, "l");
120 }
121 theObj[Geo::N_MODULES] = new TTree("o2hmp", "HMPID Data Decoding Statistic results");
122 theObj[Geo::N_MODULES]->Branch("Average_Event_Size", &avgEventSize, "F");
123 theObj[Geo::N_MODULES]->Branch("Average_Busy_Time", &avgBusyTime, "F");
124
125 // Update the Stat for the Decoding
126 int numEqui = mDeco->getNumberOfEquipments();
127 // cycle in order to update info for the last event
128 for (int i = 0; i < numEqui; i++) {
129 if (mDeco->mTheEquipments[i]->mNumberOfEvents > 0) {
130 mDeco->updateStatistics(mDeco->mTheEquipments[i]);
131 }
132 }
133 char summaryFileName[254];
134 snprintf(summaryFileName, 254, "%s_stat.txt", mRootStatFile.c_str());
135 mDeco->writeSummaryFile(summaryFileName);
136 for (int e = 0; e < numEqui; e++) {
137 avgEventSize = mDeco->getAverageEventSize(e);
138 avgBusyTime = mDeco->getAverageBusyTime(e);
139 theObj[Geo::N_MODULES]->Fill();
140 }
141 for (int m = 0; m < o2::hmpid::Geo::N_MODULES; m++) {
142 for (int y = 0; y < o2::hmpid::Geo::N_YCOLS; y++) {
143 for (int x = 0; x < o2::hmpid::Geo::N_XROWS; x++) {
144 xb = x;
145 yb = y;
146 numOfSamples = mDeco->getPadSamples(m, x, y);
147 sumOfCharges = mDeco->getPadSum(m, x, y);
148 squareOfCharges = mDeco->getPadSquares(m, x, y);
149 theObj[m]->Fill();
150 }
151 }
152 }
153 for (int i = 0; i <= Geo::N_MODULES; i++) {
154 theObj[i]->Write();
155 }
156
157 mExTimer.logMes("End the Decoding ! Digits decoded = " + std::to_string(mTotalDigits) + " Frames received = " + std::to_string(mTotalFrames));
158 mExTimer.stop();
159 return;
160}
161//_________________________________________________________________________________________________
162// the decodeTF() function processes the the messages generated by the (sub)TimeFrame builder
164{
165 LOG(debug) << "*********** In decodeTF **************";
166
167 // get the input buffer
168 auto& inputs = pc.inputs();
169
170 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
171 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
172 {
173 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
174 std::vector<InputSpec> dummy{InputSpec{"dummy", ConcreteDataMatcher{"HMP", "RAWDATA", 0xDEADBEEF}}};
175 for (const auto& ref : InputRecordWalker(inputs, dummy)) {
176 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
178 if (payloadSize == 0) {
180 if (++contDeadBeef <= maxWarn) {
181 LOGP(warning, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
182 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
183 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
184 }
185 return;
186 }
187 }
188 contDeadBeef = 0; // if good data, reset the counter
189 }
190
191 DPLRawParser parser(inputs, o2::framework::select("TF:HMP/RAWDATA"));
192 mDeco->mDigits.clear();
193 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
194 uint32_t* theBuffer = (uint32_t*)it.raw();
195 mDeco->setUpStream(theBuffer, it.size() + it.offset());
196 try {
197 if (mFastAlgorithm) {
198 mDeco->decodePageFast(&theBuffer);
199 } else {
200 mDeco->decodePage(&theBuffer);
201 }
202 } catch (int e) {
203 // The stream end !
204 LOG(debug) << "End Page decoding !";
205 }
206 mTotalFrames++;
207 }
208 mTotalDigits += mDeco->mDigits.size();
209}
210
211//_________________________________________________________________________________________________
212// the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
213// TODO: rearrange, test
215{
216 LOG(info) << "*********** In decode readout **************";
217
218 // get the input buffer
219 auto& inputs = pc.inputs();
220 DPLRawParser parser(inputs, o2::framework::select("readout:HMP/RAWDATA"));
221 // DPLRawParser parser(inputs, o2::framework::select("HMP/readout"));
222
223 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
224 uint32_t* theBuffer = (uint32_t*)it.raw();
225 mDeco->setUpStream(theBuffer, it.size() + it.offset());
226 try {
227 if (mFastAlgorithm) {
228 mDeco->decodePageFast(&theBuffer);
229 } else {
230 mDeco->decodePage(&theBuffer);
231 }
232 } catch (int e) {
233 // The stream end !
234 LOG(debug) << "End Page decoding !";
235 }
236 }
237 return;
238}
239
240// the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
241// TODO: rearrange, test
243{
244 LOG(info) << "*********** In decode rawfile **************";
245
246 for (auto&& input : pc.inputs()) {
247 if (input.spec->binding == "file") {
248 const header::DataHeader* header = o2::header::get<header::DataHeader*>(input.header);
249 if (!header) {
250 return;
251 }
252
253 auto const* raw = input.payload;
254 size_t payloadSize = DataRefUtils::getPayloadSize(input);
255
256 LOG(info) << " payloadSize=" << payloadSize;
257 if (payloadSize == 0) {
258 return;
259 }
260
261 uint32_t* theBuffer = (uint32_t*)input.payload;
262 int pagesize = payloadSize;
263 mDeco->setUpStream(theBuffer, pagesize);
264 try {
265 if (mFastAlgorithm) {
266 mDeco->decodePageFast(&theBuffer);
267 } else {
268 mDeco->decodePage(&theBuffer);
269 }
270 } catch (int e) {
271 // The stream end !
272 LOG(debug) << "End Page decoding !";
273 }
274 }
275 }
276 return;
277}
278
279//_________________________________________________________________________________________________
281{
282 std::vector<o2::framework::InputSpec> inputs;
283 inputs.emplace_back("TF", o2::framework::ConcreteDataTypeMatcher{"HMP", "RAWDATA"}, o2::framework::Lifetime::Timeframe);
284 if (askDISTSTF) {
285 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
286 }
287
288 std::vector<o2::framework::OutputSpec> outputs;
289 outputs.emplace_back("HMP", "DIGITS", 0, o2::framework::Lifetime::Timeframe);
290 outputs.emplace_back("HMP", "INTRECORDS", 0, o2::framework::Lifetime::Timeframe);
291
292 return DataProcessorSpec{
293 "HMP-RawStreamDecoder",
294 inputs,
295 outputs,
296 AlgorithmSpec{adaptFromTask<DataDecoderTask>()},
297 Options{{"result-file", VariantType::String, "/tmp/hmpRawDecodeResults", {"Base name of the decoding results files."}},
298 {"fast-decode", VariantType::Bool, false, {"Use the fast algorithm. (error 0.8%)"}}}};
299}
300
301} // namespace hmpid
302} // end namespace o2
A raw page parser for DPL input.
int32_t i
A helper class to iteratate over all parts of all input routes.
Definition of the RAW Data Header.
std::ostringstream debug
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
void run(framework::ProcessingContext &pc) final
void init(framework::InitContext &ic) final
void decodeReadout(framework::ProcessingContext &pc)
void endOfStream(framework::EndOfStreamContext &ec) override
This is invoked whenever we have an EndOfStream event.
void decodeTF(framework::ProcessingContext &pc)
void decodeRawFile(framework::ProcessingContext &pc)
void stop()
stop : stops the timer
Definition Common.h:73
void elapseMes(std::string const message)
Definition Common.h:91
void start()
start : starts the timer
Definition Common.h:64
void logMes(std::string const message)
Definition Common.h:81
static constexpr int N_XROWS
Definition Geo.h:88
static constexpr int N_MODULES
Definition Geo.h:87
static constexpr int N_YCOLS
Definition Geo.h:89
static constexpr int MAXEQUIPMENTS
Definition Geo.h:79
float getAverageEventSize(int Equipment)
void decodePageFast(uint32_t **streamBuf)
std::vector< o2::hmpid::Digit > mDigits
void decodePage(uint32_t **streamBuffer)
o2::InteractionRecord mIntReco
bool setUpStream(void *Buffer, long BufferLen)
void writeSummaryFile(char *summaryFileName)
double getPadSum(int Module, int Row, int Column)
HmpidEquipment * mTheEquipments[Geo::MAXEQUIPMENTS]
uint16_t getPadSamples(int Module, int Row, int Column)
void updateStatistics(HmpidEquipment *eq)
float getAverageBusyTime(int Equipment)
double getPadSquares(int Module, int Row, int Column)
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint GLuint end
Definition glcorearb.h:469
GLint y
Definition glcorearb.h:270
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
o2::framework::DataProcessorSpec getDecodingSpec(bool askSTFDist)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"