Project
Loading...
Searching...
No Matches
SACProcessorSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#include <vector>
17#include <chrono>
18
19#include "Framework/Task.h"
24#include "DPLUtils/RawParser.h"
29
31#include "TPCBase/RDHUtils.h"
34
35using HighResClock = std::chrono::steady_clock;
36using namespace o2::framework;
37
38namespace o2::tpc
39{
40
42{
43 public:
44 enum class DebugFlags {
45 HBFInfo = 0x10,
46 };
48
49 SACProcessorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req) : mCCDBRequest(req) {}
50
51 void init(InitContext& ic) final
52 {
54 mDebugLevel = ic.options().get<uint32_t>("debug-level");
55 mDecoder.setDebugLevel(mDebugLevel);
56
57 mAggregateTFs = ic.options().get<uint32_t>("aggregate-tfs");
58
59 const auto nthreadsDecoding = ic.options().get<uint32_t>("nthreads-decoding");
60 sac::Decoder::setNThreads(nthreadsDecoding);
61
62 const auto reAlignType = ic.options().get<uint32_t>("try-re-align");
63 if (reAlignType <= uint32_t(sac::Decoder::ReAlignType::MaxType)) {
64 mDecoder.setReAlignType(sac::Decoder::ReAlignType(reAlignType));
65 }
66 }
67
68 void run(ProcessingContext& pc) final
69 {
71 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
72 const auto startTime = HighResClock::now();
73
74 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; // TODO: Change to SAC when changed in DD
75 for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
76 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
77 // ---| extract hardware information to do the processing |---
78 const auto feeId = (FEEIDType)dh->subSpecification;
79 const auto link = rdh_utils::getLink(feeId);
80 const uint32_t cruID = rdh_utils::getCRU(feeId);
81 const auto endPoint = rdh_utils::getEndPoint(feeId);
82
83 // only select SACs
84 // ToDo: cleanup once SACs will be propagated not as RAWDATA, but SAC.
85 if (link != rdh_utils::SACLinkID) {
86 continue;
87 }
88 if (mDebugLevel & (uint32_t)DebugFlags::HBFInfo) {
89 LOGP(info, "SAC Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6} ({:3}/{}/{:2})", tinfo.firstTForbit, tinfo.tfCounter, tinfo.runNumber, feeId, cruID, endPoint, link);
90 }
91
92 // ---| data loop |---
93 const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(ref);
94 RawParser parser(raw.data(), raw.size());
95 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
96 const auto size = it.size();
97 if (size == 0) {
98 auto rdhPtr = reinterpret_cast<const o2::header::RDHAny*>(it.raw());
99 const auto rdhVersion = raw::RDHUtils::getVersion(rdhPtr);
100 if (!rdhPtr || rdhVersion < 6) {
101 throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data());
102 }
103 // TODO: should only be done once for the first packet
104 if ((mDecoder.getReferenceTime() < 0) && (raw::RDHUtils::getPacketCounter(rdhPtr))) {
105 const double referenceTime = o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS() + tinfo.firstTForbit * o2::constants::lhc::LHCOrbitMUS * 0.001;
106 LOGP(info, "setting time stamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}", referenceTime, tinfo.tfCounter, tinfo.firstTForbit);
107 mDecoder.setReferenceTime(referenceTime); // TODO set proper time
108 }
109 continue;
110 }
111 auto data = (const char*)it.data();
112 mDecoder.process(data, size);
113 }
114 }
115
116 if ((mProcessedTFs > 0) && !(mProcessedTFs % mAggregateTFs)) {
117 mDecoder.runDecoding();
118 sendData(pc.outputs());
119 }
120
121 if (mDebugLevel & (uint32_t)sac::Decoder::DebugFlags::TimingInfo) {
122 auto endTime = HighResClock::now();
123 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
124 LOGP(info, "Time spent for TF {}, firstTForbit {}: {} s", tinfo.tfCounter, tinfo.firstTForbit, elapsed_seconds.count());
125 }
126
127 ++mProcessedTFs;
128 }
129
131 {
132 output.snapshot(Output{"TPC", "REFTIMESAC", 0}, mDecoder.getDecodedData().referenceTime);
133 output.snapshot(Output{"TPC", "DECODEDSAC", 0}, mDecoder.getDecodedData().getGoodData());
134 mDecoder.clearDecodedData();
135 }
136
138 {
139 LOGP(info, "endOfStream");
140 mDecoder.finalize();
141 sendData(ec.outputs());
142 }
143
144 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
145 {
147 }
148
149 private:
150 size_t mProcessedTFs{0};
151 uint32_t mDebugLevel{0};
152 uint32_t mAggregateTFs{0};
153 sac::Decoder mDecoder;
154 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
155};
156
158{
159 using device = o2::tpc::SACProcessorDevice;
160
161 std::vector<InputSpec> inputs;
162 inputs.emplace_back(InputSpec{"tpcraw", ConcreteDataTypeMatcher{"TPC", "RAWDATA"}, Lifetime::Timeframe});
163 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
164 true, // GRPECS=true
165 false, // GRPLHCIF
166 false, // GRPMagField
167 false, // askMatLUT
169 inputs);
170 std::vector<OutputSpec> outputs;
171 outputs.emplace_back("TPC", "DECODEDSAC", 0, Lifetime::Sporadic);
172 outputs.emplace_back("TPC", "REFTIMESAC", 0, Lifetime::Sporadic);
173
174 return DataProcessorSpec{
175 "tpc-sac-processor",
176 inputs,
177 outputs,
178 AlgorithmSpec{adaptFromTask<device>(ccdbRequest)},
179 Options{
180 {"debug-level", VariantType::UInt32, 0u, {"amount of debug to show"}},
181 {"nthreads-decoding", VariantType::UInt32, 1u, {"Number of threads used for decoding"}},
182 {"aggregate-tfs", VariantType::UInt32, 1u, {"Number of TFs to aggregate before running decoding"}},
183 {"try-re-align", VariantType::UInt32, 0u, {"Try to re-align data stream in case of missing packets. 0 - no; 1 - yes; 2 - yes, and fill missing packets"}},
184 } // end Options
185 }; // end DataProcessorSpec
186}
187} // namespace o2::tpc
Simple interface to the CDB manager.
std::chrono::high_resolution_clock HighResClock
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Generic parser for consecutive raw pages.
Decoding of integrated analogue currents.
TPC Sampled Analogue Current processing.
auto getOrbitResetTimeMS() const
void checkUpdates(o2::framework::ProcessingContext &pc)
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
A helper class to iteratate over all parts of all input routes.
rdh_utils::FEEIDType FEEIDType
SACProcessorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req)
@ HBFInfo
Show data for each HBF.
void init(InitContext &ic) final
void sendData(DataAllocator &output)
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void endOfStream(EndOfStreamContext &ec) final
void run(ProcessingContext &pc) final
static void setNThreads(const int nThreads)
Definition SACDecoder.h:216
@ TimingInfo
Print timing information.
void setDebugLevel(uint32_t level=(uint32_t) DebugFlags::PacketInfo)
set a debug level, see DebugFlags
Definition SACDecoder.h:198
double getReferenceTime() const
Definition SACDecoder.h:166
const DecodedData & getDecodedData() const
Definition SACDecoder.h:209
void setReAlignType(ReAlignType type=ReAlignType::AlignOnly)
Definition SACDecoder.h:211
bool process(const char *data, size_t size)
void setReferenceTime(double time)
Definition SACDecoder.h:161
@ MaxType
Largest type number.
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
uint16_t FEEIDType
Definition RDHUtils.h:26
Global TPC definitions and constants.
Definition SimTraits.h:167
o2::framework::DataProcessorSpec getSACProcessorSpec()
decode SAC raw data
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:58