54 mDebugLevel = ic.options().get<uint32_t>(
"debug-level");
57 mAggregateTFs = ic.options().get<uint32_t>(
"aggregate-tfs");
59 const auto nthreadsDecoding = ic.options().get<uint32_t>(
"nthreads-decoding");
62 const auto reAlignType = ic.options().get<uint32_t>(
"try-re-align");
72 const auto startTime = HighResClock::now();
76 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
78 const auto feeId = (
FEEIDType)dh->subSpecification;
79 const auto link = rdh_utils::getLink(feeId);
80 const uint32_t cruID = rdh_utils::getCRU(feeId);
81 const auto endPoint = rdh_utils::getEndPoint(feeId);
85 if (
link != rdh_utils::SACLinkID) {
89 LOGP(info,
"SAC Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6} ({:3}/{}/{:2})", tinfo.firstTForbit, tinfo.tfCounter, tinfo.runNumber, feeId, cruID, endPoint,
link);
93 const gsl::span<const char>
raw = pc.inputs().get<gsl::span<char>>(
ref);
95 for (
auto it = parser.begin(),
end = parser.end(); it !=
end; ++it) {
96 const auto size = it.size();
100 if (!rdhPtr || rdhVersion < 6) {
101 throw std::runtime_error(fmt::format(
"could not get RDH from packet, or version {} < 6", rdhVersion).
data());
104 if ((mDecoder.
getReferenceTime() < 0) && (raw::RDHUtils::getPacketCounter(rdhPtr))) {
106 LOGP(info,
"setting time stamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}", referenceTime, tinfo.tfCounter, tinfo.firstTForbit);
111 auto data = (
const char*)it.data();
116 if ((mProcessedTFs > 0) && !(mProcessedTFs % mAggregateTFs)) {
122 auto endTime = HighResClock::now();
123 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
124 LOGP(info,
"Time spent for TF {}, firstTForbit {}: {} s", tinfo.tfCounter, tinfo.firstTForbit, elapsed_seconds.count());
139 LOGP(info,
"endOfStream");
150 size_t mProcessedTFs{0};
151 uint32_t mDebugLevel{0};
152 uint32_t mAggregateTFs{0};
153 sac::Decoder mDecoder;
154 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
161 std::vector<InputSpec> inputs;
163 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(
true,
170 std::vector<OutputSpec> outputs;
171 outputs.emplace_back(
"TPC",
"DECODEDSAC", 0, Lifetime::Sporadic);
172 outputs.emplace_back(
"TPC",
"REFTIMESAC", 0, Lifetime::Sporadic);
180 {
"debug-level", VariantType::UInt32, 0u, {
"amount of debug to show"}},
181 {
"nthreads-decoding", VariantType::UInt32, 1u, {
"Number of threads used for decoding"}},
182 {
"aggregate-tfs", VariantType::UInt32, 1u, {
"Number of TFs to aggregate before running decoding"}},
183 {
"try-re-align", VariantType::UInt32, 0u, {
"Try to re-align data stream in case of missing packets. 0 - no; 1 - yes; 2 - yes, and fill missing packets"}},
Simple interface to the CDB manager.
std::chrono::high_resolution_clock HighResClock
o2::raw::RawFileWriter * raw
Helper for geometry and GRP related CCDB requests.
Generic parser for consecutive raw pages.
Decoding of integrated analogue currents.
TPC Sampled Analogue Current processing.
auto getOrbitResetTimeMS() const
void checkUpdates(o2::framework::ProcessingContext &pc)
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
rdh_utils::FEEIDType FEEIDType
SACProcessorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req)
@ HBFInfo
Show data for each HBF.
void init(InitContext &ic) final
void sendData(DataAllocator &output)
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void endOfStream(EndOfStreamContext &ec) final
void run(ProcessingContext &pc) final
static void setNThreads(const int nThreads)
@ TimingInfo
Print timing information.
void setDebugLevel(uint32_t level=(uint32_t) DebugFlags::PacketInfo)
set a debug level, see DebugFlags
double getReferenceTime() const
const DecodedData & getDecodedData() const
void setReAlignType(ReAlignType type=ReAlignType::AlignOnly)
bool process(const char *data, size_t size)
void setReferenceTime(double time)
@ MaxType
Largest type number.
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr double LHCOrbitMUS
Defining ITS Vertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
Global TPC definitions and constants.
o2::framework::DataProcessorSpec getSACProcessorSpec()
decode SAC raw data
static constexpr int getVersion()
get numeric version of the RDH