46 LOG(
debug) <<
"CompressedDecoding init";
48 if (mNorbitsPerTF == -1) {
52 mMaskNoise = ic.
options().
get<
bool>(
"mask-noise");
53 mNoiseRate = ic.
options().
get<
int>(
"noise-counts");
54 mRowFilter = ic.
options().
get<
bool>(
"row-filter");
62 auto finishFunction = [
this]() {
63 LOG(
debug) <<
"CompressedDecoding finish";
72 mHasToBePosted =
false;
88 lastval = last->first() + last->size();
89 lastIR = last->mFirstIR;
105 int n_tof_window =
row->size();
106 int n_orbits = n_tof_window / 3;
107 int digit_size = alldigits->size();
115 std::vector<uint8_t>& patterns = mDecoder.
getPatterns();
119 std::vector<uint64_t>& errors = mDecoder.
getErrors();
130 diagnosticFrequency.setTFIDInfo(tfinfo);
154 static bool isFirstCall =
true;
157 LOG(info) <<
"N orbits per TF set to " <<
norbits;
162 mCreationTime = std::chrono::high_resolution_clock::now().time_since_epoch().count() / 1000000;
173 mHasToBePosted =
true;
174 }
else if (mNCrateOpenTF == mNCrateCloseTF) {
175 mHasToBePosted =
true;
178 if (mHasToBePosted) {
186 LOGF(
debug,
"TOF CompressedDecoding total timing: Cpu: %.3e Real: %.3e s in %d slots",
187 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
192 auto& inputs = pc.
inputs();
198 static size_t contDeadBeef = 0;
201 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
203 if (payloadSize == 0) {
205 if (++contDeadBeef <= maxWarn) {
206 LOGP(alarm,
"Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
207 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, tinfo.tfCounter, tinfo.firstTForbit, payloadSize,
208 contDeadBeef == maxWarn ? fmt::format(
". {} such inputs in row received, stopping reporting", contDeadBeef) :
"");
227 const auto* headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
228 auto payloadIn =
ref.payload;
231 if (payloadInSize < 1) {
247 if (mNCrateOpenTF == 0) {
248 mInitOrbit = crateOrbit->
orbitID;
256void CompressedDecodingTask::trailerHandler(
const CrateHeader_t* crateHeader,
const CrateOrbit_t* crateOrbit,
257 const CrateTrailer_t* crateTrailer,
const Diagnostic_t* diagnostics,
258 const Error_t* errors)
261 LOG(
debug) <<
"Crate closed " << crateHeader->drmID;
265 if (mCurrentOrbit > 0) {
266 mDecoder.
addCrateHeaderData(mCurrentOrbit, crateHeader->drmID, crateHeader->bunchID, crateTrailer->eventCounter);
268 mDecoder.
addCrateHeaderData(crateOrbit->orbitID, crateHeader->drmID, crateHeader->bunchID, crateTrailer->eventCounter);
272 auto numberOfDiagnostics = crateTrailer->numberOfDiagnostics;
273 auto numberOfErrors = crateTrailer->numberOfErrors;
274 for (
int i = 0;
i < numberOfDiagnostics;
i++) {
275 const uint32_t*
val =
reinterpret_cast<const uint32_t*
>(&(diagnostics[
i]));
276 if (mCurrentOrbit > 0) {
277 mDecoder.
addPattern(*
val, crateHeader->drmID, mCurrentOrbit, crateHeader->bunchID);
279 mDecoder.
addPattern(*
val, crateHeader->drmID, crateOrbit->orbitID, crateHeader->bunchID);
404 for (
int i = 0;
i < numberOfErrors;
i++) {
405 const uint32_t*
val =
reinterpret_cast<const uint32_t*
>(&(errors[
i]));
412 const auto& rdhr = *rdh;
414 mCurrentOrbit = RDHUtils::getHeartBeatOrbit(rdhr);
423 if ((RDHUtils::getPageCounter(rdhr) == 0) && (RDHUtils::getTriggerType(rdhr) &
o2::trigger::TF)) {
428void CompressedDecodingTask::frameHandler(
const CrateHeader_t* crateHeader,
const CrateOrbit_t* crateOrbit,
429 const FrameHeader_t* frameHeader,
const PackedHit_t* packedHits)
431 for (
int i = 0;
i < frameHeader->numberOfHits; ++
i) {
432 auto packedHit = packedHits +
i;
433 if (mCurrentOrbit > 0) {
434 mDecoder.
InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, mCurrentOrbit, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot);
436 mDecoder.
InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, crateOrbit->orbitID, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot);
443 std::vector<InputSpec> inputs;
445 inputs.emplace_back(
"stdDist",
"FLP",
"DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
448 std::shared_ptr<o2::base::GRPGeomRequest> ccdbRequest;
449 if (norbitPerTF == -1) {
450 ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(
false,
463 std::vector<OutputSpec> outputs;
472 "tof-compressed-decoder",
476 AlgorithmSpec{adaptFromTask<CompressedDecodingTask>(conet, dataDesc, ccdbRequest, norbitPerTF, localCmp)},
478 {
"row-filter", VariantType::Bool,
false, {
"Filter empty row"}},
479 {
"mask-noise", VariantType::Bool,
false, {
"Flag to mask noisy digits"}},
480 {
"noise-counts", VariantType::Int, 11, {
"Counts in a single (TF) payload"}}}};
TOF compressed data decoding task.
Header of the General Run Parameters object.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static const VerbosityConfig & Instance()
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void init(InitContext &ic) final
void decodeTF(ProcessingContext &pc)
void postData(ProcessingContext &pc)
void endOfStream(EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void setTimeStamp(int val)
static uint32_t getNOrbitInTF()
void addCrateHeaderData(unsigned long orbit, int crate, int32_t bc, uint32_t eventCounter)
std::vector< uint8_t > & getPatterns()
Diagnostic & getDiagnosticFrequency()
void addPattern(const uint32_t val, int icrate, int orbit, int bc)
std::vector< Digit > * getDigitPerTimeFrame()
DigitHeader & getDigitHeader()
void setNOrbitInTF(uint32_t norb)
void maskNoiseRate(int val)
void fillDiagnosticFrequency()
void setFirstIR(const o2::InteractionRecord &ir)
std::vector< ReadoutWindowData > * getReadoutWindowDataFiltered()
std::vector< ReadoutWindowData > * getReadoutWindowData()
void setDecoderBuffer(const char *val)
void setDecoderBufferSize(long val)
std::vector< uint64_t > & getErrors()
void InsertDigit(int icrate, int itrm, int itdc, int ichain, int channel, uint32_t orbit, uint16_t bunchid, int time_ext, int tdc, int tot)
void addError(const uint32_t val, int icrate)
constexpr o2::header::DataOrigin gDataOriginTOF
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
framework::DataProcessorSpec getCompressedDecodingSpec(const std::string &inputDesc, bool conet=false, bool askDISTSTF=true, int norbitPerTF=-1, bool localCmp=false)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"