Project
Loading...
Searching...
No Matches
DataDecoderSpec2.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
18
19#include <random>
20#include <iostream>
21#include <fstream>
22#include <stdexcept>
23#include <array>
24#include <functional>
25#include <vector>
26
27#include "TTree.h"
28#include "TFile.h"
29
30#include <gsl/span>
31
36#include "Framework/Lifetime.h"
37#include "Framework/Output.h"
38#include "Framework/Task.h"
40#include "Framework/Logger.h"
43
47
50#include "HMPIDBase/Geo.h"
54
55namespace o2
56{
57namespace hmpid
58{
59
60using namespace o2;
61using namespace o2::framework;
63
64//=======================
65// Data decoder
67{
68
69 LOG(info) << "[HMPID Data Decoder - Init] ( create Raw Stream Decoder for " << Geo::MAXEQUIPMENTS << " equipments !";
70
71 mProduceResults = ic.options().get<bool>("get-results-statistics");
72 mRootStatFile = ic.options().get<std::string>("result-file");
73 mFastAlgorithm = ic.options().get<bool>("fast-decode");
75 mDeco->init();
76 mTotalDigits = 0;
77 mTotalFrames = 0;
78
79 mExTimer.start();
80 return;
81}
82
84{
85 mDeco->mDigits.clear();
86 mTriggers.clear();
87 LOG(info) << "[HMPID Data Decoder - Run] !";
88
89 decodeTF(pc);
90 // TODO: accept other types of Raw Streams ...
91 // decodeReadout(pc);
92 // decodeRawFile(pc);
93
94 // Output the Digits/Triggers vector
96 pc.outputs().snapshot(o2::framework::Output{"HMP", "DIGITS", 0}, mDeco->mDigits);
97 pc.outputs().snapshot(o2::framework::Output{"HMP", "INTRECORDS", 0}, mTriggers);
98
99 mExTimer.elapseMes("Decoding... Digits decoded = " + std::to_string(mTotalDigits) + " Frames received = " + std::to_string(mTotalFrames));
100 return;
101}
102
104{
105 // Records the statistics
106 float avgEventSize; //[o2::hmpid::Geo::MAXEQUIPMENTS];
107 float avgBusyTime; //[o2::hmpid::Geo::MAXEQUIPMENTS];
108 float numOfSamples; //[o2::hmpid::Geo::N_MODULES][o2::hmpid::Geo::N_YCOLS][o2::hmpid::Geo::N_XROWS];
109 float sumOfCharges; //[o2::hmpid::Geo::N_MODULES][o2::hmpid::Geo::N_YCOLS][o2::hmpid::Geo::N_XROWS];
110 float squareOfCharges; //[o2::hmpid::Geo::N_MODULES][o2::hmpid::Geo::N_YCOLS][o2::hmpid::Geo::N_XROWS];
111 float xb;
112 float yb;
113
114 if (!mProduceResults) {
115 LOG(info) << "Skip the Stat file creation ! ";
116 } else {
117 TString filename = TString::Format("%s_stat.root", mRootStatFile.c_str());
118 LOG(info) << "Create the stat file " << filename.Data();
119 TFile mfileOut(TString::Format("%s", filename.Data()), "RECREATE");
120 TTree* theObj[Geo::N_MODULES + 1];
121 for (int i = 0; i < Geo::N_MODULES; i++) { // Create the TTree array
122 TString tit = TString::Format("HMPID Data Decoding Statistic results Mod=%d", i);
123 theObj[i] = new TTree("o2hmp", tit);
124 theObj[i]->Branch("x", &xb, "s");
125 theObj[i]->Branch("y", &yb, "s");
126 theObj[i]->Branch("Samples", &numOfSamples, "i");
127 theObj[i]->Branch("Sum_of_charges", &sumOfCharges, "l");
128 theObj[i]->Branch("Sum_of_square", &squareOfCharges, "l");
129 }
130 theObj[Geo::N_MODULES] = new TTree("o2hmp", "HMPID Data Decoding Statistic results");
131 theObj[Geo::N_MODULES]->Branch("Average_Event_Size", &avgEventSize, "F");
132 theObj[Geo::N_MODULES]->Branch("Average_Busy_Time", &avgBusyTime, "F");
133
134 // Update the Stat for the Decoding
135 int numEqui = mDeco->getNumberOfEquipments();
136 // cycle in order to update info for the last event
137 for (int i = 0; i < numEqui; i++) {
138 if (mDeco->mTheEquipments[i]->mNumberOfEvents > 0) {
139 mDeco->updateStatistics(mDeco->mTheEquipments[i]);
140 }
141 }
142 char summaryFileName[254];
143 snprintf(summaryFileName, 254, "%s_stat.txt", mRootStatFile.c_str());
144 mDeco->writeSummaryFile(summaryFileName);
145 for (int e = 0; e < numEqui; e++) {
146 avgEventSize = mDeco->getAverageEventSize(e);
147 avgBusyTime = mDeco->getAverageBusyTime(e);
148 theObj[Geo::N_MODULES]->Fill();
149 }
150 for (int m = 0; m < o2::hmpid::Geo::N_MODULES; m++) {
151 for (int y = 0; y < o2::hmpid::Geo::N_YCOLS; y++) {
152 for (int x = 0; x < o2::hmpid::Geo::N_XROWS; x++) {
153 xb = x;
154 yb = y;
155 numOfSamples = mDeco->getPadSamples(m, x, y);
156 sumOfCharges = mDeco->getPadSum(m, x, y);
157 squareOfCharges = mDeco->getPadSquares(m, x, y);
158 theObj[m]->Fill();
159 }
160 }
161 }
162 for (int i = 0; i <= Geo::N_MODULES; i++) {
163 theObj[i]->Write();
164 }
165 }
166 mExTimer.logMes("End the Decoding ! Digits decoded = " + std::to_string(mTotalDigits) + " Frames received = " + std::to_string(mTotalFrames));
167 mExTimer.stop();
168 return;
169}
170//_________________________________________________________________________________________________
171// the decodeTF() function processes the the messages generated by the (sub)TimeFrame builder
173{
174 LOG(info) << "*********** In decodeTF **************";
175
176 // get the input buffer
177 auto& inputs = pc.inputs();
178
179 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
180 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
181 {
182 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
183 std::vector<InputSpec> dummy{InputSpec{"dummy", ConcreteDataMatcher{"HMP", "RAWDATA", 0xDEADBEEF}}};
184 for (const auto& ref : InputRecordWalker(inputs, dummy)) {
185 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
186 auto payloadSize = DataRefUtils::getPayloadSize(ref);
187 if (payloadSize == 0) {
189 if (++contDeadBeef <= maxWarn) {
190 LOGP(warning, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
191 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
192 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
193 }
194 return;
195 }
196 }
197 contDeadBeef = 0; // if good data, reset the counter
198 }
199
200 DPLRawParser parser(inputs, o2::framework::select("TF:HMP/RAWDATA"));
201 // mDeco->mDigits.clear();
202 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
203 int pointerToTheFirst = mDeco->mDigits.size();
204 uint32_t* theBuffer = (uint32_t*)it.raw();
205 mDeco->setUpStream(theBuffer, it.size() + it.offset());
206 try {
207 if (mFastAlgorithm) {
208 mDeco->decodePageFast(&theBuffer);
209 } else {
210 mDeco->decodePage(&theBuffer);
211 }
212 } catch (int e) {
213 // The stream end !
214 LOG(debug) << "End Page decoding !";
215 }
216 // std::cout << " fDigit=" << pointerToTheFirst << " lDigit=," << mDeco->mDigits.size() << " nDigit=" << mDeco->mDigits.size()-pointerToTheFirst << std::endl;
217 mTriggers.push_back(o2::hmpid::Trigger(mDeco->mIntReco, pointerToTheFirst, mDeco->mDigits.size() - pointerToTheFirst));
218 mTotalFrames++;
219 }
220
221 mTotalDigits += mDeco->mDigits.size();
222 LOG(info) << "Writing Digitis=" << mDeco->mDigits.size() << "/" << mTotalDigits << " Frame=" << mTotalFrames << " IntRec " << mDeco->mIntReco;
223 return;
224}
225
226//_________________________________________________________________________________________________
227// the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
228// TODO: rearrange, test
230{
231 LOG(info) << "*********** In decode readout **************";
232
233 // get the input buffer
234 auto& inputs = pc.inputs();
235 DPLRawParser parser(inputs, o2::framework::select("readout:HMP/RAWDATA"));
236 // DPLRawParser parser(inputs, o2::framework::select("HMP/readout"));
237
238 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
239 uint32_t* theBuffer = (uint32_t*)it.raw();
240 mDeco->setUpStream(theBuffer, it.size() + it.offset());
241 try {
242 if (mFastAlgorithm) {
243 mDeco->decodePageFast(&theBuffer);
244 } else {
245 mDeco->decodePage(&theBuffer);
246 }
247 } catch (int e) {
248 // The stream end !
249 LOG(debug) << "End Page decoding !";
250 }
251 }
252 return;
253}
254
255// the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
256// TODO: rearrange, test
258{
259 LOG(info) << "*********** In decode rawfile **************";
260
261 for (auto&& input : pc.inputs()) {
262 if (input.spec->binding == "file") {
263
264 auto const* raw = input.payload;
265 size_t payloadSize = DataRefUtils::getPayloadSize(input);
266
267 LOG(info) << " payloadSize=" << payloadSize;
268 if (payloadSize == 0) {
269 return;
270 }
271
272 uint32_t* theBuffer = (uint32_t*)input.payload;
273 int pagesize = payloadSize;
274 mDeco->setUpStream(theBuffer, pagesize);
275 try {
276 if (mFastAlgorithm) {
277 mDeco->decodePageFast(&theBuffer);
278 } else {
279 mDeco->decodePage(&theBuffer);
280 }
281 } catch (int e) {
282 // The stream end !
283 LOG(debug) << "End Page decoding !";
284 }
285 }
286 }
287 return;
288}
289
291{
292 std::vector<o2::hmpid::Digit> dig;
293 dig.clear();
294 std::vector<o2::hmpid::Trigger> trg;
295 trg.clear();
296
297 // first arrange the triggers in chronological order
298 std::sort(mTriggers.begin(), mTriggers.end());
299 // then build a new Digit Vector physically ordered for triggers
300 int i = 0;
301 int k = i;
303 int count = 0;
304 int firstEntry;
305 while (i < mTriggers.size()) {
306 tr = mTriggers[i];
307 count = 0;
308 firstEntry = dig.size();
309 while (k < mTriggers.size() && mTriggers[i].getTriggerID() == mTriggers[k].getTriggerID()) {
310 for (int j = mTriggers[k].getFirstEntry(); j <= mTriggers[k].getLastEntry(); j++) {
311 dig.push_back(mDeco->mDigits[j]);
312 count++;
313 }
314 k++;
315 }
316 tr.setDataRange(firstEntry, count);
317 trg.push_back(tr);
318 i = k;
319 }
320
321 // then arrange the triggers in chamber order
322 for (int i = 0; i < trg.size(); i++) {
323 if (trg[i].getFirstEntry() > trg[i].getLastEntry()) {
324 continue;
325 }
326 std::sort(dig.begin() + trg[i].getFirstEntry(), dig.begin() + trg[i].getLastEntry());
327 }
328
329 mTriggers.swap(trg);
330 mDeco->mDigits.swap(dig);
331 trg.clear();
332 dig.clear();
333}
334
335//_________________________________________________________________________________________________
337{
338 std::vector<o2::framework::InputSpec> inputs;
339 inputs.emplace_back("TF", o2::framework::ConcreteDataTypeMatcher{"HMP", "RAWDATA"}, o2::framework::Lifetime::Timeframe);
340 if (askDISTSTF) {
341 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
342 }
343
344 std::vector<o2::framework::OutputSpec> outputs;
345 outputs.emplace_back("HMP", "DIGITS", 0, o2::framework::Lifetime::Timeframe);
346 outputs.emplace_back("HMP", "INTRECORDS", 0, o2::framework::Lifetime::Timeframe);
347
348 return DataProcessorSpec{
349 "HMP-RawStreamDecoder",
350 inputs,
351 outputs,
352 AlgorithmSpec{adaptFromTask<DataDecoderTask2>()},
353 Options{{"get-results-statistics", VariantType::Bool, false, {"Generate intermediat output results."}},
354 {"result-file", VariantType::String, "/tmp/hmpRawDecodeResults", {"Base name of the decoding results files."}},
355 {"fast-decode", VariantType::Bool, false, {"Use the fast algorithm. (error 0.8%)"}}}};
356}
357
358} // namespace hmpid
359} // end namespace o2
A raw page parser for DPL input.
int32_t i
A helper class to iteratate over all parts of all input routes.
Definition of the RAW Data Header.
uint32_t j
Definition RawData.h:0
std::ostringstream debug
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
void decodeTF(framework::ProcessingContext &pc)
void endOfStream(framework::EndOfStreamContext &ec) override
This is invoked whenever we have an EndOfStream event.
void decodeReadout(framework::ProcessingContext &pc)
void decodeRawFile(framework::ProcessingContext &pc)
void run(framework::ProcessingContext &pc) final
void init(framework::InitContext &ic) final
void stop()
stop : stops the timer
Definition Common.h:73
void elapseMes(std::string const message)
Definition Common.h:91
void start()
start : starts the timer
Definition Common.h:64
void logMes(std::string const message)
Definition Common.h:81
static constexpr int N_XROWS
Definition Geo.h:88
static constexpr int N_MODULES
Definition Geo.h:87
static constexpr int N_YCOLS
Definition Geo.h:89
static constexpr int MAXEQUIPMENTS
Definition Geo.h:79
float getAverageEventSize(int Equipment)
void decodePageFast(uint32_t **streamBuf)
std::vector< o2::hmpid::Digit > mDigits
void decodePage(uint32_t **streamBuffer)
o2::InteractionRecord mIntReco
bool setUpStream(void *Buffer, long BufferLen)
void writeSummaryFile(char *summaryFileName)
double getPadSum(int Module, int Row, int Column)
HmpidEquipment * mTheEquipments[Geo::MAXEQUIPMENTS]
uint16_t getPadSamples(int Module, int Row, int Column)
void updateStatistics(HmpidEquipment *eq)
float getAverageBusyTime(int Equipment)
double getPadSquares(int Module, int Row, int Column)
HMPID Trigger declaration.
Definition Trigger.h:32
void setDataRange(int firstentry, int nentries)
Definition Trigger.h:46
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLint GLsizei count
Definition glcorearb.h:399
GLuint GLuint end
Definition glcorearb.h:469
GLint y
Definition glcorearb.h:270
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
o2::framework::DataProcessorSpec getDecodingSpec2(bool askSTFDist)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"