Project
Loading...
Searching...
No Matches
DataReaderTask.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
18
28
29namespace o2::trd
30{
31
33{
34 mReader.setMaxErrWarnPrinted(ic.options().get<int>("log-max-errors"), ic.options().get<int>("log-max-warnings"));
35 int nTimeBins = ic.options().get<int>("number-of-TBs");
36 if (nTimeBins >= 0) {
37 LOGP(info, "Number of time bins set to {} externally", nTimeBins);
38 mReader.setNumberOfTimeBins(nTimeBins);
39 }
40 mReader.configure(mTrackletHCHeaderState, mHalfChamberWords, mHalfChamberMajor, mOptions);
41 mProcessEveryNthTF = ic.options().get<int>("every-nth-tf");
42}
43
45{
46 LOGF(info, "At EoS we have read: %lu Digits, %lu Tracklets. Received %.3f MB input data and rejected %.3f MB",
47 mDigitsTotal, mTrackletsTotal, mDatasizeInTotal / (1024. * 1024.), (float)mWordsRejectedTotal * 4. / (1024. * 1024.));
49}
50
52{
53 if (matcher == ConcreteDataMatcher("CTP", "Trig_Offset", 0)) {
54 LOG(info) << " CTP/Config/TriggerOffsets updated.";
56 return;
57 } else if (matcher == ConcreteDataMatcher("TRD", "LinkToHcid", 0)) {
58 LOG(info) << "Updated Link ID to HCID mapping";
59 mReader.setLinkMap((const o2::trd::LinkToHCIDMapping*)obj);
60 return;
61 }
62}
63
64void DataReaderTask::updateTimeDependentParams(framework::ProcessingContext& pc)
65{
66 if (!mInitOnceDone) {
67 pc.inputs().get<o2::ctp::TriggerOffsetsParam*>("trigoffset");
68 pc.inputs().get<o2::trd::LinkToHCIDMapping*>("linkToHcid");
69 mInitOnceDone = true;
70 }
71}
72
74{
75 constexpr auto origin = header::gDataOriginTRD;
77 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload.
78 // frame detected we have no data and send this instead
79 // send empty output so as to not block workflow
80 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
81 for (const auto& ref : o2::framework::InputRecordWalker(pc.inputs(), {dummy})) {
82 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
84 if (payloadSize == 0) {
86 if (++contDeadBeef <= maxWarn) {
87 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
88 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
89 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
90 }
91 return true;
92 }
93 }
94 contDeadBeef = 0; // if good data, reset the counter
95 return false;
96}
97
99{
100 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
101 if (tinfo.globalRunNumberChanged) { // new run is starting
102 mInitOnceDone = false;
103 }
104 updateTimeDependentParams(pc);
105 auto dataReadStart = std::chrono::high_resolution_clock::now();
106
107 if ((mNTFsProcessed++ % mProcessEveryNthTF != 0) || isTimeFrameEmpty(pc)) {
108 mReader.buildDPLOutputs(pc);
109 mReader.reset();
110 return;
111 }
112
113 size_t datasizeInTF = 0;
114 std::vector<InputSpec> sel{InputSpec{"filter", ConcreteDataTypeMatcher{"TRD", "RAWDATA"}}};
115 uint64_t tfCount = 0;
116 for (auto& ref : InputRecordWalker(pc.inputs(), sel)) {
117 // loop over incoming HBFs from all half-CRUs (typically 128 * 72 iterations per TF)
118 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
119 tfCount = dh->tfCounter;
120 const char* payloadIn = ref.payload;
121 auto payloadInSize = DataRefUtils::getPayloadSize(ref);
122 if (mOptions[TRDVerboseBit]) {
123 LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ",
124 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadInSize);
125 }
126 mReader.setDataBuffer(payloadIn);
127 mReader.setDataBufferSize(payloadInSize);
128 mReader.run();
129 datasizeInTF += payloadInSize;
130 if (mOptions[TRDVerboseBit]) {
131 LOG(info) << "relevant vectors to read : " << mReader.getTrackletsFound() << " tracklets and " << mReader.getDigitsFound() << " compressed digits";
132 }
133 }
134
135 mReader.buildDPLOutputs(pc);
136 std::chrono::duration<double, std::milli> dataReadTime = std::chrono::high_resolution_clock::now() - dataReadStart;
137 LOGP(info, "Digits: {}, Tracklets: {}, DataRead in: {:.3f} MB, Rejected: {:.3f} kB for TF {} in {} ms",
138 mReader.getDigitsFound(), mReader.getTrackletsFound(), (float)datasizeInTF / (1024. * 1024.), (float)mReader.getWordsRejected() * 4. / 1024., tfCount,
139 std::chrono::duration_cast<std::chrono::milliseconds>(dataReadTime).count());
140 mDigitsTotal += mReader.getDigitsFound();
141 mTrackletsTotal += mReader.getTrackletsFound();
142 mDatasizeInTotal += datasizeInTF;
143 mWordsRejectedTotal += mReader.getWordsRejected();
144 mReader.reset();
145}
146
147} // namespace o2::trd
TRD raw data translator.
Global TRD definitions and constants.
TRD epn task to read incoming data.
A helper class to iteratate over all parts of all input routes.
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
int getWordsRejected() const
void setDataBufferSize(long val)
void setLinkMap(const LinkToHCIDMapping *map)
int getTrackletsFound() const
int getDigitsFound() const
void printHalfChamberHeaderReport() const
void buildDPLOutputs(o2::framework::ProcessingContext &outputs)
void setMaxErrWarnPrinted(int nerr, int nwar)
void setDataBuffer(const char *val)
void configure(int tracklethcheader, int halfchamberwords, int halfchambermajor, std::bitset< 16 > options)
void setNumberOfTimeBins(int tb)
void init(InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) override
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
void run(ProcessingContext &pc) final
bool isTimeFrameEmpty(ProcessingContext &pc)
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:597
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"