54 mDebugLevel = ic.options().get<uint32_t>(
"debug-level");
57 mAggregateTFs = ic.options().get<uint32_t>(
"aggregate-tfs");
59 const auto nthreadsDecoding = ic.options().get<uint32_t>(
"nthreads-decoding");
62 const auto reAlignType = ic.options().get<uint32_t>(
"try-re-align");
72 const auto startTime = HighResClock::now();
76 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
78 const auto feeId = (
FEEIDType)dh->subSpecification;
79 const auto link = rdh_utils::getLink(feeId);
80 const uint32_t cruID = rdh_utils::getCRU(feeId);
81 const auto endPoint = rdh_utils::getEndPoint(feeId);
85 if (link != rdh_utils::SACLinkID) {
89 LOGP(info,
"SAC Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6} ({:3}/{}/{:2})", tinfo.firstTForbit, tinfo.tfCounter, tinfo.runNumber, feeId, cruID, endPoint, link);
93 const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(
ref);
95 for (
auto it = parser.begin(),
end = parser.end(); it !=
end; ++it) {
96 const auto size = it.size();
100 if (!rdhPtr || rdhVersion < 6) {
101 throw std::runtime_error(fmt::format(
"could not get RDH from packet, or version {} < 6", rdhVersion).
data());
104 if ((mDecoder.
getReferenceTime() < 0) && (raw::RDHUtils::getPacketCounter(rdhPtr))) {
106 LOGP(info,
"setting time stamp reset reference to: {}, at tfCounter: {}, firstTForbit: {}", referenceTime, tinfo.tfCounter, tinfo.firstTForbit);
111 auto data = (
const char*)it.data();
116 if ((mProcessedTFs > 0) && !(mProcessedTFs % mAggregateTFs)) {
122 auto endTime = HighResClock::now();
123 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
124 LOGP(info,
"Time spent for TF {}, firstTForbit {}: {} s", tinfo.tfCounter, tinfo.firstTForbit, elapsed_seconds.count());
139 LOGP(info,
"endOfStream");
150 size_t mProcessedTFs{0};
151 uint32_t mDebugLevel{0};
152 uint32_t mAggregateTFs{0};
153 sac::Decoder mDecoder;
154 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
161 std::vector<InputSpec> inputs;
163 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(
true,
170 std::vector<OutputSpec> outputs;
171 outputs.emplace_back(
"TPC",
"DECODEDSAC", 0, Lifetime::Sporadic);
172 outputs.emplace_back(
"TPC",
"REFTIMESAC", 0, Lifetime::Sporadic);
180 {
"debug-level", VariantType::UInt32, 0u, {
"amount of debug to show"}},
181 {
"nthreads-decoding", VariantType::UInt32, 1u, {
"Number of threads used for decoding"}},
182 {
"aggregate-tfs", VariantType::UInt32, 1u, {
"Number of TFs to aggregate before running decoding"}},
183 {
"try-re-align", VariantType::UInt32, 0u, {
"Try to re-align data stream in case of missing packets. 0 - no; 1 - yes; 2 - yes, and fill missing packets"}},
Simple interface to the CDB manager.
std::chrono::high_resolution_clock HighResClock
Helper for geometry and GRP related CCDB requests.
Generic parser for consecutive raw pages.
Decoding of integrated analogue currents.
TPC Sampled Analogue Current processing.
auto getOrbitResetTimeMS() const
void checkUpdates(o2::framework::ProcessingContext &pc)
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
rdh_utils::FEEIDType FEEIDType
SACProcessorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req)
@ HBFInfo
Show data for each HBF.
void init(InitContext &ic) final
void sendData(DataAllocator &output)
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void endOfStream(EndOfStreamContext &ec) final
void run(ProcessingContext &pc) final
static void setNThreads(const int nThreads)
@ TimingInfo
Print timing information.
void setDebugLevel(uint32_t level=(uint32_t) DebugFlags::PacketInfo)
set a debug level, see DebugFlags
double getReferenceTime() const
const DecodedData & getDecodedData() const
void setReAlignType(ReAlignType type=ReAlignType::AlignOnly)
bool process(const char *data, size_t size)
void setReferenceTime(double time)
@ MaxType
Largest type number.
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
constexpr o2::header::DataOrigin gDataOriginTPC
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
Global TPC definitions and constants.
o2::framework::DataProcessorSpec getSACProcessorSpec()
decode SAC raw data
static constexpr int getVersion()
get numeric version of the RDH