Project
Loading...
Searching...
No Matches
LinkZSToDigitsSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <fmt/format.h>
18#include "Framework/Lifetime.h"
19#include "Framework/Logger.h"
20#include "DPLUtils/RawParser.h"
21#include "Headers/DataHeader.h"
26#include "TPCBase/CRU.h"
27#include "TPCBase/PadSecPos.h"
28#include "TPCBase/Mapper.h"
31#include <vector>
32#include <string>
34
35using namespace o2::framework;
37
38namespace o2
39{
40namespace tpc
41{
42
43o2::framework::DataProcessorSpec getLinkZSToDigitsSpec(int channel, const std::string_view inputDef, std::vector<int> const& tpcSectors)
44{
45
46 static constexpr int MaxNumberOfBunches = 3564;
47
48 struct ProcessAttributes {
49 uint32_t lastOrbit{0};
50 uint32_t firstOrbit{0};
51 uint32_t firstBC{0};
52 uint32_t maxEvents{100};
53 uint32_t processedEvents{0};
54 uint64_t activeSectors{0};
55 bool isContinuous{false};
56 bool quit{false};
57 std::vector<int> tpcSectors{};
58 std::array<std::vector<Digit>, Sector::MAXSECTOR> digitsAll{};
59
61 void clearDigits()
62 {
63 for (auto& digits : digitsAll) {
64 digits.clear();
65 }
66 }
67
69 void sortDigits()
70 {
71 // sort digits
72 for (auto& digits : digitsAll) {
73 std::sort(digits.begin(), digits.end(), [](const auto& a, const auto& b) {
74 if (a.getTimeStamp() < b.getTimeStamp()) {
75 return true;
76 }
77 if ((a.getTimeStamp() == b.getTimeStamp()) && (a.getRow() < b.getRow())) {
78 return true;
79 }
80 return false;
81 });
82 }
83 }
84 };
85
86 // ===| stateful initialization |=============================================
87 //
88 auto initFunction = [channel, tpcSectors](InitContext& ic) {
89 auto processAttributes = std::make_shared<ProcessAttributes>();
90 // ===| create and set up processing attributes |===
91 {
92 processAttributes->maxEvents = static_cast<uint32_t>(ic.options().get<int>("max-events"));
93 processAttributes->tpcSectors = tpcSectors;
94 }
95
96 // ===| data processor |====================================================
97 //
98 auto processingFct = [processAttributes, channel](ProcessingContext& pc) {
99 if (processAttributes->quit) {
100 return;
101 }
102
103 // ===| digit snapshot |===
104 //
105 // lambda that snapshots digits to be sent out;
106 // prepares and attaches header with sector information
107 //
108 auto snapshotDigits = [&pc, processAttributes, channel](std::vector<o2::tpc::Digit> const& digits, int sector) {
109 o2::tpc::TPCSectorHeader header{sector};
110 header.activeSectors = processAttributes->activeSectors;
111 // digit for now are transported per sector, not per lane
112 // pc.outputs().snapshot(Output{"TPC", "DIGITS", static_cast<SubSpecificationType>(channel), header},
113 pc.outputs().snapshot(Output{"TPC", "DIGITS", static_cast<SubSpecificationType>(sector), header},
114 const_cast<std::vector<o2::tpc::Digit>&>(digits));
115 };
116
117 auto& mapper = Mapper::instance();
118
119 // loop over all inputs
120 for (auto& input : pc.inputs()) {
121 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
122 auto payloadSize = DataRefUtils::getPayloadSize(input);
123
124 // select only RAW data
125 if (dh->dataDescription != o2::header::gDataDescriptionRawData) {
126 continue;
127 }
128
129 // ===| extract electronics mapping information |===
130 const auto subSpecification = dh->subSpecification;
131 const auto cruID = subSpecification >> 16;
132 const auto linkID = ((subSpecification + (subSpecification >> 8)) & 0xFF) - 1;
133 const auto dataWrapperID = ((subSpecification >> 8) & 0xFF) > 0;
134 const auto globalLinkID = linkID + dataWrapperID * 12;
135 const auto sector = cruID / 10;
136 const CRU cru(cruID);
137 const int fecLinkOffsetCRU = (mapper.getPartitionInfo(cru.partition()).getNumberOfFECs() + 1) / 2;
138 const int fecInPartition = (globalLinkID % 12) + (globalLinkID > 11) * fecLinkOffsetCRU;
139 const int regionIter = cruID % 2;
140
141 const int sampaMapping[10] = {0, 0, 1, 1, 2, 3, 3, 4, 4, 2};
142 const int channelOffset[10] = {0, 16, 0, 16, 0, 0, 16, 0, 16, 16};
143
144 processAttributes->activeSectors |= (0x1 << sector);
145
146 LOGP(debug, "Specifier: {}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
147 LOGP(debug, "Payload size: {}", payloadSize);
148 LOGP(debug, "CRU: {}; linkID: {}; dataWrapperID: {}; globalLinkID: {}", cruID, linkID, dataWrapperID, globalLinkID);
149
150 try {
151 o2::framework::RawParser parser(input.payload, payloadSize);
152
153 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
154 auto* rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
155 if (!rdhPtr) {
156 break;
157 }
158
159 const auto& rdh = *rdhPtr;
160
161 // ===| only accept physics triggers |===
162 if (o2::raw::RDHUtils::getTriggerType(rdhPtr) != 0x10) {
163 continue;
164 }
165
166 // ===| event handling |===
167 //
168 // really ugly, better treatment required extension in DPL
169 // events are are detected by close by orbit numbers
170 //
171 const auto hbOrbit = o2::raw::RDHUtils::getHeartBeatOrbit(rdhPtr);
172 const auto lastOrbit = processAttributes->lastOrbit;
173
174 if (!processAttributes->firstOrbit) {
175 processAttributes->firstOrbit = hbOrbit;
176 if (!processAttributes->isContinuous) {
177 processAttributes->firstBC = o2::raw::RDHUtils::getHeartBeatBC(rdhPtr);
178 }
179 }
180
181 const auto globalBCoffset = ((hbOrbit - processAttributes->firstOrbit) * MaxNumberOfBunches - processAttributes->firstBC); // To be calculated
182
183 if ((lastOrbit > 0) && (hbOrbit > (lastOrbit + 3))) {
184 ++processAttributes->processedEvents;
185 LOG(info) << fmt::format("Number of processed events: {} ({})", processAttributes->processedEvents, processAttributes->maxEvents);
186 processAttributes->sortDigits();
187
188 // publish digits of all configured sectors
189 for (auto isector : processAttributes->tpcSectors) {
190 snapshotDigits(processAttributes->digitsAll[isector], isector);
191 }
192 processAttributes->clearDigits();
193
194 processAttributes->activeSectors = 0;
195 if (processAttributes->processedEvents >= processAttributes->maxEvents) {
196 LOGP(info, "Maximum number of events reached ({}), no more processing will be done", processAttributes->maxEvents);
197 processAttributes->quit = true;
198 pc.services().get<ControlService>().endOfStream();
199 //pc.services().get<ControlService>().readyToQuit(QuitRequest::All);
200 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
201 break;
202 }
203 }
204
205 processAttributes->lastOrbit = hbOrbit;
206 const auto size = it.size();
207 auto data = it.data();
208 LOGP(debug, "Raw data block payload size: {}", size);
209
210 // cast raw data pointer to link based zero suppression definition
213
214 while (zsdata < zsdataEnd) {
215 const auto channelBits = zsdata->cont.header.getChannelBits();
216 const uint32_t numberOfWords = zsdata->cont.header.numWordsPayload;
217 assert((channelBits.count() - 1) / 10 == numberOfWords - 1);
218
219 std::size_t processedChannels = 0;
220 for (std::size_t ichannel = 0; ichannel < channelBits.size(); ++ichannel) {
221 if (!channelBits[ichannel]) {
222 continue;
223 }
224
225 // adc value
226 const auto adcValue = zsdata->getADCValueFloat(processedChannels);
227
228 // pad mapping
229 // TODO: verify the assumptions of the channel mapping!
230 // assumes the following sorting (s_chn is the channel on the sampa),
231 // in this case for even regiona (lower half fec)
232 // chn# SAMPA s_chn
233 // 0 0 0
234 // 1 0 1
235 // 2 0 16
236 // 3 0 17
237 // 4 1 0
238 // 5 1 1
239 // 6 1 16
240 // 7 1 17
241 // 8 2 0
242 // 9 2 1
243 //
244 // 10 0 2
245 // 11 0 3
246 // 12 0 18
247 // 13 0 19
248 // 14 1 2
249 // 15 1 3
250 // 16 1 18
251 // 17 1 19
252 // 18 2 2
253 // 19 2 3
254 //
255 // 20 0 4
256 // 21 0 5
257 // 22 0 20
258 // 23 0 21
259 // ...
260 // For the uneven regions (upper half fec), the sampa ordering
261 // is 3, 3, 3, 3, 4, 4, 4, 4, 2, 2
262 const int istreamm = ((ichannel % 10) / 2);
263 const int partitionStream = istreamm + regionIter * 5;
264 const int sampaOnFEC = sampaMapping[partitionStream];
265 const int channel = (ichannel % 2) + 2 * (ichannel / 10);
266 const int channelOnSAMPA = channel + channelOffset[partitionStream];
267
268 const auto padSecPos = mapper.padSecPos(cru, fecInPartition, sampaOnFEC, channelOnSAMPA);
269 const auto& padPos = padSecPos.getPadPos();
270 int timebin = (globalBCoffset + zsdata->cont.header.bunchCrossing) / 8; // To be calculated
271
272 // add digit
273 processAttributes->digitsAll[sector].emplace_back(cruID, adcValue, padPos.getRow(), padPos.getPad(), timebin);
274 ++processedChannels;
275 }
276
277 // go to next time bin
278 zsdata = zsdata->next();
279 }
280 }
281
282 } catch (const std::runtime_error& e) {
283 LOG(alarm) << "can not create raw parser form input data";
284 o2::header::hexDump("payload", input.payload, payloadSize, 64);
285 LOG(alarm) << e.what();
286 }
287 }
288 };
289
290 return processingFct;
291 };
292
293 std::stringstream id;
294 id << "TPCDigitizer" << channel;
295
296 std::vector<OutputSpec> outputs; // define channel by triple of (origin, type id of data to be sent on this channel, subspecification)
297 for (auto isector : tpcSectors) {
298 outputs.emplace_back("TPC", "DIGITS", static_cast<SubSpecificationType>(isector), Lifetime::Timeframe);
299 }
300
301 return DataProcessorSpec{
302 id.str().c_str(),
303 select(inputDef.data()),
304 outputs,
305 AlgorithmSpec{initFunction},
306 Options{
307 {"max-events", VariantType::Int, 100, {"maximum number of events to process"}},
308 {"pedestal-file", VariantType::String, "", {"file with pedestals and noise for zero suppression"}}}};
309}
310} // namespace tpc
311} // namespace o2
Definition of the TPC Digit.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
Processor spec for running link based zero suppressed data to digit converter.
Generic parser for consecutive raw pages.
std::ostringstream debug
definitions to deal with the link based zero suppression format
o2::header::DataHeader::SubSpecificationType SubSpecificationType
static Mapper & instance(const std::string mappingDir="")
Definition Mapper.h:44
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean * data
Definition glcorearb.h:298
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:597
#define quit(arg)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > select(char const *matcher="")
void hexDump(const char *desc, const void *voidaddr, size_t len, size_t max=0)
helper function to print a hex/ASCII dump of some memory
o2::framework::DataProcessorSpec getLinkZSToDigitsSpec(int channel, const std::string_view inputDef, std::vector< int > const &tpcSectors)
Processor to convert link based zero suppressed data to simulation digits.
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Digit > digits