Project
Loading...
Searching...
No Matches
ClusterDecoderRawSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
28#include <fairlogger/Logger.h>
29#include <set>
30#include <memory> // for make_shared
31#include <vector>
32#include <map>
33#include <cassert>
34#include <iomanip>
35#include <string>
36#include <numeric>
37
38using namespace o2::framework;
39using namespace o2::header;
40using namespace o2::dataformats;
41
42namespace o2
43{
44namespace tpc
45{
46
54{
55 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
57 std::string processorName = "tpc-cluster-decoder";
58 struct ProcessAttributes {
59 std::unique_ptr<HardwareClusterDecoder> decoder;
60 std::set<o2::header::DataHeader::SubSpecificationType> activeInputs;
61 int verbosity = 0;
62 bool sendMC = false;
63 };
64
65 auto initFunction = [sendMC](InitContext& ic) {
66 // there is nothing to init at the moment
67 auto processAttributes = std::make_shared<ProcessAttributes>();
68 processAttributes->decoder = std::make_unique<HardwareClusterDecoder>();
69 processAttributes->sendMC = sendMC;
70
71 auto processSectorFunction = [processAttributes](ProcessingContext& pc, DataRef const& ref, DataRef const& mclabelref) {
72 auto& decoder = processAttributes->decoder;
73 auto& verbosity = processAttributes->verbosity;
74 auto& activeInputs = processAttributes->activeInputs;
75 // this will return a span of TPC clusters
77 auto const* dataHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
78 o2::header::DataHeader::SubSpecificationType fanSpec = dataHeader->subSpecification;
79
80 // init the stacks for forwarding the sector header
81 // FIXME check if there is functionality in the DPL to forward the stack
82 // FIXME make one function
83 o2::tpc::TPCSectorHeader const* sectorHeaderMC = nullptr;
84 if (DataRefUtils::isValid(mclabelref)) {
85 sectorHeaderMC = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(mclabelref);
86 }
87
88 if (sectorHeaderMC && sectorHeaderMC->sector() < 0) {
89 pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, {*sectorHeaderMC}}, fanSpec);
90 }
91 auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
92 if (sectorHeader && sectorHeader->sector() < 0) {
93 pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, {*sectorHeader}}, fanSpec);
94 return;
95 }
96 assert(sectorHeaderMC == nullptr || sectorHeader->sector() == sectorHeaderMC->sector());
97
98 // input to the decoder is a vector of raw pages description ClusterHardwareContainer,
99 // each specified as a pair of pointer to ClusterHardwareContainer and the number
100 // of pages in that buffer
101 // FIXME: better description of the raw page
102 size_t nPages = size / 8192;
103 std::vector<std::pair<const ClusterHardwareContainer*, std::size_t>> inputList;
104 if (verbosity > 0 && !DataRefUtils::isValid(mclabelref)) {
105 LOG(info) << "Decoder input: " << size << ", " << nPages << " pages for sector " << sectorHeader->sector();
106 }
107
108 // MC labels are received as one container of labels in the sequence matching clusters
109 // in the raw pages
110 std::vector<ConstMCLabelContainer> mcinCopiesFlat;
111 std::vector<ConstMCLabelContainerView> mcinCopiesFlatView;
113 if (DataRefUtils::isValid(mclabelref)) {
114 mcin = pc.inputs().get<gsl::span<char>>(mclabelref);
115 mcinCopiesFlat.resize(nPages);
116 mcinCopiesFlatView.reserve(nPages);
117 if (verbosity > 0) {
118 LOG(info) << "Decoder input: " << size << ", " << nPages << " pages, " << mcin.getIndexedSize() << " MC label sets for sector " << sectorHeader->sector();
119 }
120 }
121
122 // FIXME: the decoder takes vector of MCLabelContainers as input and the retreived
123 // input can not be moved because the input is const, so we have to copy
124 // Furthermore, the current decoder implementation supports handling of MC labels
125 // only for single 8kb pages. So we have to add the raw pages individually and create
126 // MC label containers for the corresponding clusters.
127 size_t mcinPos = 0;
128 size_t totalNumberOfClusters = 0;
129 for (size_t page = 0; page < nPages; page++) {
130 MCLabelContainer mcinCopy;
131 inputList.emplace_back(reinterpret_cast<const ClusterHardwareContainer*>(ref.payload + page * 8192), 1);
132 const ClusterHardwareContainer& container = *(inputList.back().first);
133 if (verbosity > 1) {
134 LOG(info) << "Decoder input in page " << std::setw(2) << page << ": " //
135 << "CRU " << std::setw(3) << container.CRU << " " //
136 << std::setw(3) << container.numberOfClusters << " cluster(s)"; //
137 }
138 totalNumberOfClusters += container.numberOfClusters;
139 if (mcin.getBuffer().size()) {
140 for (size_t mccopyPos = 0;
141 mccopyPos < container.numberOfClusters && mcinPos < mcin.getIndexedSize();
142 mccopyPos++, mcinPos++) {
143 for (auto const& label : mcin.getLabels(mcinPos)) {
144 mcinCopy.addElement(mccopyPos, label);
145 }
146 }
147 }
148 mcinCopy.flatten_to(mcinCopiesFlat[page]);
149 mcinCopiesFlatView.emplace_back(mcinCopiesFlat[page]);
150 }
151 // FIXME: introduce error handling policy: throw, ignore, warn
152 //assert(!mcin || mcinPos == mcin->getIndexedSize());
153 if (mcin.getBuffer().size() && mcinPos != totalNumberOfClusters) {
154 LOG(error) << "inconsistent number of MC label objects processed"
155 << ", expecting MC label objects for " << totalNumberOfClusters << " cluster(s)"
156 << ", got " << mcin.getIndexedSize();
157 }
158 // output of the decoder is sorted in (sector,globalPadRow) coordinates, individual
159 // containers are created for clusters and MC labels per (sector,globalPadRow) address
160 char* outputBuffer = nullptr;
161 auto outputAllocator = [&pc, &fanSpec, &outputBuffer, sectorHeader](size_t size) -> char* {
162 outputBuffer = pc.outputs().newChunk(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, sectorHeader ? o2::header::Stack{*sectorHeader} : o2::header::Stack{}}, size).data();
163 return outputBuffer;
164 };
165 MCLabelContainer mcout;
166 decoder->decodeClusters(inputList, outputAllocator, (mcin.getBuffer().size() ? &mcinCopiesFlatView : nullptr), &mcout);
167
168 // TODO: reestablish the logging messages on the raw buffer
169 // if (verbosity > 1) {
170 // LOG(info) << "decoder " << std::setw(2) << sectorHeader->sector() //
171 // << ": decoded " << std::setw(4) << coll.clusters.size() << " clusters on sector " //
172 // << std::setw(2) << (int)coll.sector << "[" << (int)coll.globalPadRow << "]"; //
173 // }
174
175 if (DataRefUtils::isValid(mclabelref)) {
176 if (verbosity > 0) {
177 LOG(info) << "sending " << mcout.getIndexedSize()
178 << " label object(s)" << std::endl;
179 }
180 // serialize the complete list of MC label containers
181 ConstMCLabelContainer labelsFlat;
182 mcout.flatten_to(labelsFlat);
183 pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, sectorHeaderMC ? o2::header::Stack{*sectorHeaderMC} : o2::header::Stack{}}, labelsFlat);
184 }
185 };
186
187 auto processingFct = [processAttributes, processSectorFunction](ProcessingContext& pc) {
188 struct SectorInputDesc {
189 DataRef dataref;
190 DataRef mclabelref;
191 };
192 // loop over all inputs and their parts and associate data with corresponding mc truth data
193 // by the subspecification
194 std::map<int, SectorInputDesc> inputs;
195 std::vector<InputSpec> filter = {
196 {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHW"}, Lifetime::Timeframe},
197 {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHWMCLBL"}, Lifetime::Timeframe},
198 };
199 for (auto const& inputRef : InputRecordWalker(pc.inputs(), filter)) {
200 auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(inputRef);
201 if (sectorHeader == nullptr) {
202 LOG(error) << "sector header missing on header stack for input on " << inputRef.spec->binding;
203 continue;
204 }
205 const int sector = sectorHeader->sector();
206 if (DataRefUtils::match(inputRef, {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHW"}})) {
207 inputs[sector].dataref = inputRef;
208 }
209 if (DataRefUtils::match(inputRef, {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHWMCLBL"}})) {
210 inputs[sector].mclabelref = inputRef;
211 }
212 }
213 for (auto const& input : inputs) {
214 if (processAttributes->sendMC && !DataRefUtils::isValid(input.second.mclabelref)) {
215 throw std::runtime_error("missing the required MC label data for sector " + std::to_string(input.first));
216 }
217 processSectorFunction(pc, input.second.dataref, input.second.mclabelref);
218 }
219 };
220
221 // return the actual processing function as a lambda function using variables
222 // of the init function
223 return processingFct;
224 };
225
226 auto createInputSpecs = [](bool makeMcInput) {
227 std::vector<InputSpec> inputSpecs{
228 InputSpec{{"rawin"}, gDataOriginTPC, "CLUSTERHW", 0, Lifetime::Timeframe},
229 };
230 if (makeMcInput) {
231 // FIXME: define common data type specifiers
232 constexpr o2::header::DataDescription datadesc("CLUSTERHWMCLBL");
233 inputSpecs.emplace_back(InputSpec{"mclblin", gDataOriginTPC, datadesc, 0, Lifetime::Timeframe});
234 }
235 return std::move(inputSpecs);
236 };
237
238 auto createOutputSpecs = [](bool makeMcOutput) {
239 std::vector<OutputSpec> outputSpecs{
240 OutputSpec{{"clusterout"}, gDataOriginTPC, "CLUSTERNATIVE", 0, Lifetime::Timeframe},
241 };
242 if (makeMcOutput) {
243 OutputLabel label{"mclblout"};
244 // have to use a new data description, routing is only based on origin and decsription
245 constexpr o2::header::DataDescription datadesc("CLNATIVEMCLBL");
246 outputSpecs.emplace_back(label, gDataOriginTPC, datadesc, 0, Lifetime::Timeframe);
247 }
248 return std::move(outputSpecs);
249 };
250
251 return DataProcessorSpec{processorName,
252 {createInputSpecs(sendMC)},
253 {createOutputSpecs(sendMC)},
254 AlgorithmSpec(initFunction)};
255}
256
257} // namespace tpc
258} // namespace o2
#define verbosity
Processor spec for decoder of TPC raw cluster data.
Class of a TPC cluster as produced by the hardware cluster finder (needs a postprocessing step to con...
A const (ready only) version of MCTruthContainer.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Decoder to convert TPC ClusterHardware to ClusterNative.
A helper class to iteratate over all parts of all input routes.
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
const gsl::span< const char > & getBuffer() const
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
void snapshot(const Output &spec, T const &object)
DataChunk & newChunk(const Output &, size_t)
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
static constexpr int MAXSECTOR
Definition Sector.h:44
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::header::DataDescription DataDescription
O2 data header classes and API, v0.1.
Definition DetID.h:49
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
framework::DataProcessorSpec getClusterDecoderRawSpec(bool sendMC=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool match(DataRef const &ref, const char *binding)
static bool isValid(DataRef const &ref)
uint32_t SubSpecificationType
Definition DataHeader.h:620
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"