Project
Loading...
Searching...
No Matches
ClusterDecoderRawSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18#include "Headers/DataHeader.h"
28#include <fairlogger/Logger.h>
29#include <set>
30#include <memory> // for make_shared
31#include <vector>
32#include <map>
33#include <cassert>
34#include <iomanip>
35#include <string>
36#include <numeric>
37
38using namespace o2::framework;
39using namespace o2::header;
40using namespace o2::dataformats;
41
42namespace o2
43{
44namespace tpc
45{
53{
54 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
56 std::string processorName = "tpc-cluster-decoder";
57 struct ProcessAttributes {
58 std::unique_ptr<HardwareClusterDecoder> decoder;
59 std::set<o2::header::DataHeader::SubSpecificationType> activeInputs;
60 int verbosity = 0;
61 bool sendMC = false;
62 };
63
64 auto initFunction = [sendMC](InitContext& ic) {
65 // there is nothing to init at the moment
66 auto processAttributes = std::make_shared<ProcessAttributes>();
67 processAttributes->decoder = std::make_unique<HardwareClusterDecoder>();
68 processAttributes->sendMC = sendMC;
69
70 auto processSectorFunction = [processAttributes](ProcessingContext& pc, DataRef const& ref, DataRef const& mclabelref) {
71 auto& decoder = processAttributes->decoder;
72 auto& verbosity = processAttributes->verbosity;
73 auto& activeInputs = processAttributes->activeInputs;
74 // this will return a span of TPC clusters
76 auto const* dataHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
77 o2::header::DataHeader::SubSpecificationType fanSpec = dataHeader->subSpecification;
78
79 // init the stacks for forwarding the sector header
80 // FIXME check if there is functionality in the DPL to forward the stack
81 // FIXME make one function
82 o2::header::Stack rawHeaderStack;
83 o2::header::Stack mcHeaderStack;
84 o2::tpc::TPCSectorHeader const* sectorHeaderMC = nullptr;
85 if (DataRefUtils::isValid(mclabelref)) {
86 sectorHeaderMC = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(mclabelref);
87 if (sectorHeaderMC) {
88 o2::header::Stack actual{*sectorHeaderMC};
89 std::swap(mcHeaderStack, actual);
90 if (sectorHeaderMC->sector() < 0) {
91 pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, std::move(mcHeaderStack)}, fanSpec);
92 }
93 }
94 }
95 auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
96 if (sectorHeader) {
97 o2::header::Stack actual{*sectorHeader};
98 std::swap(rawHeaderStack, actual);
99 if (sectorHeader->sector() < 0) {
100 pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, std::move(rawHeaderStack)}, fanSpec);
101 return;
102 }
103 }
104 assert(sectorHeaderMC == nullptr || sectorHeader->sector() == sectorHeaderMC->sector());
105
106 // input to the decoder is a vector of raw pages description ClusterHardwareContainer,
107 // each specified as a pair of pointer to ClusterHardwareContainer and the number
108 // of pages in that buffer
109 // FIXME: better description of the raw page
110 size_t nPages = size / 8192;
111 std::vector<std::pair<const ClusterHardwareContainer*, std::size_t>> inputList;
112 if (verbosity > 0 && !DataRefUtils::isValid(mclabelref)) {
113 LOG(info) << "Decoder input: " << size << ", " << nPages << " pages for sector " << sectorHeader->sector();
114 }
115
116 // MC labels are received as one container of labels in the sequence matching clusters
117 // in the raw pages
118 std::vector<ConstMCLabelContainer> mcinCopiesFlat;
119 std::vector<ConstMCLabelContainerView> mcinCopiesFlatView;
121 if (DataRefUtils::isValid(mclabelref)) {
122 mcin = pc.inputs().get<gsl::span<char>>(mclabelref);
123 mcinCopiesFlat.resize(nPages);
124 mcinCopiesFlatView.reserve(nPages);
125 if (verbosity > 0) {
126 LOG(info) << "Decoder input: " << size << ", " << nPages << " pages, " << mcin.getIndexedSize() << " MC label sets for sector " << sectorHeader->sector();
127 }
128 }
129
130 // FIXME: the decoder takes vector of MCLabelContainers as input and the retreived
131 // input can not be moved because the input is const, so we have to copy
132 // Furthermore, the current decoder implementation supports handling of MC labels
133 // only for single 8kb pages. So we have to add the raw pages individually and create
134 // MC label containers for the corresponding clusters.
135 size_t mcinPos = 0;
136 size_t totalNumberOfClusters = 0;
137 for (size_t page = 0; page < nPages; page++) {
138 MCLabelContainer mcinCopy;
139 inputList.emplace_back(reinterpret_cast<const ClusterHardwareContainer*>(ref.payload + page * 8192), 1);
140 const ClusterHardwareContainer& container = *(inputList.back().first);
141 if (verbosity > 1) {
142 LOG(info) << "Decoder input in page " << std::setw(2) << page << ": " //
143 << "CRU " << std::setw(3) << container.CRU << " " //
144 << std::setw(3) << container.numberOfClusters << " cluster(s)"; //
145 }
146 totalNumberOfClusters += container.numberOfClusters;
147 if (mcin.getBuffer().size()) {
148 for (size_t mccopyPos = 0;
149 mccopyPos < container.numberOfClusters && mcinPos < mcin.getIndexedSize();
150 mccopyPos++, mcinPos++) {
151 for (auto const& label : mcin.getLabels(mcinPos)) {
152 mcinCopy.addElement(mccopyPos, label);
153 }
154 }
155 }
156 mcinCopy.flatten_to(mcinCopiesFlat[page]);
157 mcinCopiesFlatView.emplace_back(mcinCopiesFlat[page]);
158 }
159 // FIXME: introduce error handling policy: throw, ignore, warn
160 //assert(!mcin || mcinPos == mcin->getIndexedSize());
161 if (mcin.getBuffer().size() && mcinPos != totalNumberOfClusters) {
162 LOG(error) << "inconsistent number of MC label objects processed"
163 << ", expecting MC label objects for " << totalNumberOfClusters << " cluster(s)"
164 << ", got " << mcin.getIndexedSize();
165 }
166 // output of the decoder is sorted in (sector,globalPadRow) coordinates, individual
167 // containers are created for clusters and MC labels per (sector,globalPadRow) address
168 char* outputBuffer = nullptr;
169 auto outputAllocator = [&pc, &fanSpec, &outputBuffer, &rawHeaderStack](size_t size) -> char* {
170 outputBuffer = pc.outputs().newChunk(Output{gDataOriginTPC, DataDescription("CLUSTERNATIVE"), fanSpec, std::move(rawHeaderStack)}, size).data();
171 return outputBuffer;
172 };
173 MCLabelContainer mcout;
174 decoder->decodeClusters(inputList, outputAllocator, (mcin.getBuffer().size() ? &mcinCopiesFlatView : nullptr), &mcout);
175
176 // TODO: reestablish the logging messages on the raw buffer
177 // if (verbosity > 1) {
178 // LOG(info) << "decoder " << std::setw(2) << sectorHeader->sector() //
179 // << ": decoded " << std::setw(4) << coll.clusters.size() << " clusters on sector " //
180 // << std::setw(2) << (int)coll.sector << "[" << (int)coll.globalPadRow << "]"; //
181 // }
182
183 if (DataRefUtils::isValid(mclabelref)) {
184 if (verbosity > 0) {
185 LOG(info) << "sending " << mcout.getIndexedSize()
186 << " label object(s)" << std::endl;
187 }
188 // serialize the complete list of MC label containers
189 ConstMCLabelContainer labelsFlat;
190 mcout.flatten_to(labelsFlat);
191 pc.outputs().snapshot(Output{gDataOriginTPC, DataDescription("CLNATIVEMCLBL"), fanSpec, std::move(mcHeaderStack)}, labelsFlat);
192 }
193 };
194
195 auto processingFct = [processAttributes, processSectorFunction](ProcessingContext& pc) {
196 struct SectorInputDesc {
197 DataRef dataref;
198 DataRef mclabelref;
199 };
200 // loop over all inputs and their parts and associate data with corresponding mc truth data
201 // by the subspecification
202 std::map<int, SectorInputDesc> inputs;
203 std::vector<InputSpec> filter = {
204 {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHW"}, Lifetime::Timeframe},
205 {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHWMCLBL"}, Lifetime::Timeframe},
206 };
207 for (auto const& inputRef : InputRecordWalker(pc.inputs(), filter)) {
208 auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(inputRef);
209 if (sectorHeader == nullptr) {
210 LOG(error) << "sector header missing on header stack for input on " << inputRef.spec->binding;
211 continue;
212 }
213 const int sector = sectorHeader->sector();
214 if (DataRefUtils::match(inputRef, {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHW"}})) {
215 inputs[sector].dataref = inputRef;
216 }
217 if (DataRefUtils::match(inputRef, {"check", ConcreteDataTypeMatcher{gDataOriginTPC, "CLUSTERHWMCLBL"}})) {
218 inputs[sector].mclabelref = inputRef;
219 }
220 }
221 for (auto const& input : inputs) {
222 if (processAttributes->sendMC && !DataRefUtils::isValid(input.second.mclabelref)) {
223 throw std::runtime_error("missing the required MC label data for sector " + std::to_string(input.first));
224 }
225 processSectorFunction(pc, input.second.dataref, input.second.mclabelref);
226 }
227 };
228
229 // return the actual processing function as a lambda function using variables
230 // of the init function
231 return processingFct;
232 };
233
234 auto createInputSpecs = [](bool makeMcInput) {
235 std::vector<InputSpec> inputSpecs{
236 InputSpec{{"rawin"}, gDataOriginTPC, "CLUSTERHW", 0, Lifetime::Timeframe},
237 };
238 if (makeMcInput) {
239 // FIXME: define common data type specifiers
240 constexpr o2::header::DataDescription datadesc("CLUSTERHWMCLBL");
241 inputSpecs.emplace_back(InputSpec{"mclblin", gDataOriginTPC, datadesc, 0, Lifetime::Timeframe});
242 }
243 return std::move(inputSpecs);
244 };
245
246 auto createOutputSpecs = [](bool makeMcOutput) {
247 std::vector<OutputSpec> outputSpecs{
248 OutputSpec{{"clusterout"}, gDataOriginTPC, "CLUSTERNATIVE", 0, Lifetime::Timeframe},
249 };
250 if (makeMcOutput) {
251 OutputLabel label{"mclblout"};
252 // have to use a new data description, routing is only based on origin and decsription
253 constexpr o2::header::DataDescription datadesc("CLNATIVEMCLBL");
254 outputSpecs.emplace_back(label, gDataOriginTPC, datadesc, 0, Lifetime::Timeframe);
255 }
256 return std::move(outputSpecs);
257 };
258
259 return DataProcessorSpec{processorName,
260 {createInputSpecs(sendMC)},
261 {createOutputSpecs(sendMC)},
262 AlgorithmSpec(initFunction)};
263}
264
265} // namespace tpc
266} // namespace o2
#define verbosity
Processor spec for decoder of TPC raw cluster data.
Class of a TPC cluster as produced by the hardware cluster finder (needs a postprocessing step to con...
A const (ready only) version of MCTruthContainer.
Helper class for memory management of TPC Data Formats, external from the actual data type classes to...
Decoder to convert TPC ClusterHardware to ClusterNative.
A helper class to iteratate over all parts of all input routes.
gsl::span< const TruthElement > getLabels(uint32_t dataindex) const
const gsl::span< const char > & getBuffer() const
void addElement(uint32_t dataindex, TruthElement const &element, bool noElement=false)
size_t flatten_to(ContainerType &container) const
void snapshot(const Output &spec, T const &object)
DataChunk & newChunk(const Output &, size_t)
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
static constexpr int MAXSECTOR
Definition Sector.h:44
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::header::DataDescription DataDescription
O2 data header classes and API, v0.1.
Definition DetID.h:49
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
framework::DataProcessorSpec getClusterDecoderRawSpec(bool sendMC=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool match(DataRef const &ref, const char *binding)
static bool isValid(DataRef const &ref)
uint32_t SubSpecificationType
Definition DataHeader.h:620
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"