Project
Loading...
Searching...
No Matches
LinkZSToDigitsSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <fmt/format.h>
18#include "Framework/Lifetime.h"
19#include "Framework/Logger.h"
20#include "DPLUtils/RawParser.h"
21#include "Headers/DataHeader.h"
26#include "TPCBase/CRU.h"
27#include "TPCBase/PadSecPos.h"
28#include "TPCBase/Mapper.h"
31#include <vector>
32#include <string>
34
35using namespace o2::framework;
37
38namespace o2
39{
40namespace tpc
41{
42
43o2::framework::DataProcessorSpec getLinkZSToDigitsSpec(int channel, const std::string_view inputDef, std::vector<int> const& tpcSectors)
44{
45
46 static constexpr int MaxNumberOfBunches = 3564;
47
48 struct ProcessAttributes {
49 uint32_t lastOrbit{0};
50 uint32_t firstOrbit{0};
51 uint32_t firstBC{0};
52 uint32_t maxEvents{100};
53 uint32_t processedEvents{0};
54 uint64_t activeSectors{0};
55 bool isContinuous{false};
56 bool quit{false};
57 std::vector<int> tpcSectors{};
58 std::array<std::vector<Digit>, Sector::MAXSECTOR> digitsAll{};
59
61 void clearDigits()
62 {
63 for (auto& digits : digitsAll) {
64 digits.clear();
65 }
66 }
67
69 void sortDigits()
70 {
71 // sort digits
72 for (auto& digits : digitsAll) {
73 std::sort(digits.begin(), digits.end(), [](const auto& a, const auto& b) {
74 if (a.getTimeStamp() < b.getTimeStamp()) {
75 return true;
76 }
77 if ((a.getTimeStamp() == b.getTimeStamp()) && (a.getRow() < b.getRow())) {
78 return true;
79 }
80 return false;
81 });
82 }
83 }
84 };
85
86 // ===| stateful initialization |=============================================
87 //
88 auto initFunction = [channel, tpcSectors](InitContext& ic) {
89 auto processAttributes = std::make_shared<ProcessAttributes>();
90 // ===| create and set up processing attributes |===
91 {
92 processAttributes->maxEvents = static_cast<uint32_t>(ic.options().get<int>("max-events"));
93 processAttributes->tpcSectors = tpcSectors;
94 }
95
96 // ===| data processor |====================================================
97 //
98 auto processingFct = [processAttributes, channel](ProcessingContext& pc) {
99 if (processAttributes->quit) {
100 return;
101 }
102
103 // ===| digit snapshot |===
104 //
105 // lambda that snapshots digits to be sent out;
106 // prepares and attaches header with sector information
107 //
108 auto snapshotDigits = [&pc, processAttributes, channel](std::vector<o2::tpc::Digit> const& digits, int sector) {
109 o2::tpc::TPCSectorHeader header{sector};
110 header.activeSectors = processAttributes->activeSectors;
111 // digit for now are transported per sector, not per lane
112 // pc.outputs().snapshot(Output{"TPC", "DIGITS", static_cast<SubSpecificationType>(channel), header},
113 pc.outputs().snapshot(Output{"TPC", "DIGITS", static_cast<SubSpecificationType>(sector), header},
114 const_cast<std::vector<o2::tpc::Digit>&>(digits));
115 };
116
117 auto& mapper = Mapper::instance();
118
119 // loop over all inputs
120 for (auto& input : pc.inputs()) {
121 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
122 auto payloadSize = DataRefUtils::getPayloadSize(input);
123
124 // select only RAW data
125 if (dh->dataDescription != o2::header::gDataDescriptionRawData) {
126 continue;
127 }
128
129 // ===| extract electronics mapping information |===
130 const auto subSpecification = dh->subSpecification;
131 const auto cruID = subSpecification >> 16;
132 const auto linkID = ((subSpecification + (subSpecification >> 8)) & 0xFF) - 1;
133 const auto dataWrapperID = ((subSpecification >> 8) & 0xFF) > 0;
134 const auto globalLinkID = linkID + dataWrapperID * 12;
135 const auto sector = cruID / 10;
136 const CRU cru(cruID);
137 const int fecLinkOffsetCRU = (mapper.getPartitionInfo(cru.partition()).getNumberOfFECs() + 1) / 2;
138 const int fecInPartition = (globalLinkID % 12) + (globalLinkID > 11) * fecLinkOffsetCRU;
139 const int regionIter = cruID % 2;
140
141 const int sampaMapping[10] = {0, 0, 1, 1, 2, 3, 3, 4, 4, 2};
142 const int channelOffset[10] = {0, 16, 0, 16, 0, 0, 16, 0, 16, 16};
143
144 processAttributes->activeSectors |= (0x1 << sector);
145
146 LOGP(debug, "Specifier: {}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
147 LOGP(debug, "Payload size: {}", payloadSize);
148 LOGP(debug, "CRU: {}; linkID: {}; dataWrapperID: {}; globalLinkID: {}", cruID, linkID, dataWrapperID, globalLinkID);
149
150 try {
151 o2::framework::RawParser parser(input.payload, payloadSize);
152
153 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
154 auto* rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
155 if (!rdhPtr) {
156 break;
157 }
158
159 const auto& rdh = *rdhPtr;
160
161 // ===| only accept physics triggers |===
162 if (o2::raw::RDHUtils::getTriggerType(rdhPtr) != 0x10) {
163 continue;
164 }
165
166 // ===| event handling |===
167 //
168 // really ugly, better treatment required extension in DPL
169 // events are are detected by close by orbit numbers
170 //
171 const auto hbOrbit = o2::raw::RDHUtils::getHeartBeatOrbit(rdhPtr);
172 const auto lastOrbit = processAttributes->lastOrbit;
173
174 if (!processAttributes->firstOrbit) {
175 processAttributes->firstOrbit = hbOrbit;
176 if (!processAttributes->isContinuous) {
177 processAttributes->firstBC = o2::raw::RDHUtils::getHeartBeatBC(rdhPtr);
178 }
179 }
180
181 const auto globalBCoffset = ((hbOrbit - processAttributes->firstOrbit) * MaxNumberOfBunches - processAttributes->firstBC); // To be calculated
182
183 if ((lastOrbit > 0) && (hbOrbit > (lastOrbit + 3))) {
184 ++processAttributes->processedEvents;
185 LOG(info) << fmt::format("Number of processed events: {} ({})", processAttributes->processedEvents, processAttributes->maxEvents);
186 processAttributes->sortDigits();
187
188 // publish digits of all configured sectors
189 for (auto isector : processAttributes->tpcSectors) {
190 snapshotDigits(processAttributes->digitsAll[isector], isector);
191 }
192 processAttributes->clearDigits();
193
194 processAttributes->activeSectors = 0;
195 if (processAttributes->processedEvents >= processAttributes->maxEvents) {
196 LOGP(info, "Maximum number of events reached ({}), no more processing will be done", processAttributes->maxEvents);
197 processAttributes->quit = true;
198 pc.services().get<ControlService>().endOfStream();
199 //pc.services().get<ControlService>().readyToQuit(QuitRequest::All);
200 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
201 break;
202 }
203 }
204
205 processAttributes->lastOrbit = hbOrbit;
206 const auto size = it.size();
207 auto data = it.data();
208 LOGP(debug, "Raw data block payload size: {}", size);
209
210 // cast raw data pointer to link based zero suppression definition
213
214 while (zsdata < zsdataEnd) {
215 const auto channelBits = zsdata->cont.header.getChannelBits();
216 const uint32_t numberOfWords = zsdata->cont.header.numWordsPayload;
217 assert((channelBits.count() - 1) / 10 == numberOfWords - 1);
218
219 std::size_t processedChannels = 0;
220 for (std::size_t ichannel = 0; ichannel < channelBits.size(); ++ichannel) {
221 if (!channelBits[ichannel]) {
222 continue;
223 }
224
225 // adc value
226 const auto adcValue = zsdata->getADCValueFloat(processedChannels);
227
228 // pad mapping
229 // TODO: verify the assumptions of the channel mapping!
230 // assumes the following sorting (s_chn is the channel on the sampa),
231 // in this case for even regiona (lower half fec)
232 // chn# SAMPA s_chn
233 // 0 0 0
234 // 1 0 1
235 // 2 0 16
236 // 3 0 17
237 // 4 1 0
238 // 5 1 1
239 // 6 1 16
240 // 7 1 17
241 // 8 2 0
242 // 9 2 1
243 //
244 // 10 0 2
245 // 11 0 3
246 // 12 0 18
247 // 13 0 19
248 // 14 1 2
249 // 15 1 3
250 // 16 1 18
251 // 17 1 19
252 // 18 2 2
253 // 19 2 3
254 //
255 // 20 0 4
256 // 21 0 5
257 // 22 0 20
258 // 23 0 21
259 // ...
260 // For the uneven regions (upper half fec), the sampa ordering
261 // is 3, 3, 3, 3, 4, 4, 4, 4, 2, 2
262 const int istreamm = ((ichannel % 10) / 2);
263 const int partitionStream = istreamm + regionIter * 5;
264 const int sampaOnFEC = sampaMapping[partitionStream];
265 const int channel = (ichannel % 2) + 2 * (ichannel / 10);
266 const int channelOnSAMPA = channel + channelOffset[partitionStream];
267
268 const auto padSecPos = mapper.padSecPos(cru, fecInPartition, sampaOnFEC, channelOnSAMPA);
269 const auto& padPos = padSecPos.getPadPos();
270 int timebin = (globalBCoffset + zsdata->cont.header.bunchCrossing) / 8; // To be calculated
271
272 // add digit
273 processAttributes->digitsAll[sector].emplace_back(cruID, adcValue, padPos.getRow(), padPos.getPad(), timebin);
274 ++processedChannels;
275 }
276
277 // go to next time bin
278 zsdata = zsdata->next();
279 }
280 }
281
282 } catch (const std::runtime_error& e) {
283 LOG(alarm) << "can not create raw parser form input data";
284 o2::header::hexDump("payload", input.payload, payloadSize, 64);
285 LOG(alarm) << e.what();
286 }
287 }
288 };
289
290 return processingFct;
291 };
292
293 std::stringstream id;
294 id << "TPCDigitizer" << channel;
295
296 std::vector<OutputSpec> outputs; // define channel by triple of (origin, type id of data to be sent on this channel, subspecification)
297 for (auto isector : tpcSectors) {
298 outputs.emplace_back("TPC", "DIGITS", static_cast<SubSpecificationType>(isector), Lifetime::Timeframe);
299 }
300
301 return DataProcessorSpec{
302 id.str().c_str(),
303 select(inputDef.data()),
304 outputs,
305 AlgorithmSpec{initFunction},
306 Options{
307 {"max-events", VariantType::Int, 100, {"maximum number of events to process"}},
308 {"pedestal-file", VariantType::String, "", {"file with pedestals and noise for zero suppression"}}}};
309}
310} // namespace tpc
311} // namespace o2
Definition of the TPC Digit.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
std::ostringstream debug
Processor spec for running link based zero suppressed data to digit converter.
Generic parser for consecutive raw pages.
definitions to deal with the link based zero suppression format
o2::header::DataHeader::SubSpecificationType SubSpecificationType
static Mapper & instance(const std::string mappingDir="")
Definition Mapper.h:44
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean * data
Definition glcorearb.h:298
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:598
#define quit(arg)
Defining PrimaryVertex explicitly as messageable.
std::vector< InputSpec > select(char const *matcher="")
void hexDump(const char *desc, const void *voidaddr, size_t len, size_t max=0)
helper function to print a hex/ASCII dump of some memory
o2::framework::DataProcessorSpec getLinkZSToDigitsSpec(int channel, const std::string_view inputDef, std::vector< int > const &tpcSectors)
Processor to convert link based zero suppressed data to simulation digits.
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Digit > digits