Project
Loading...
Searching...
No Matches
TPCFLPCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCFLPCMVSPEC_H
17#define O2_TPCFLPCMVSPEC_H
18
19#include <vector>
20#include <unordered_map>
21#include <fmt/format.h>
22#include "Framework/Task.h"
24#include "Framework/Logger.h"
28#include "Headers/DataHeader.h"
30#include "TPCBase/CRU.h"
31#include "DataFormatsTPC/CMV.h"
32#include "TFile.h"
33
34namespace o2::tpc
35{
36
38{
39 public:
40 TPCFLPCMVDevice(const int lane, const std::vector<uint32_t>& crus, const bool triggerPerFlp, const int nTFsBuffer)
41 : mLane{lane}, mCRUs{crus}, mTriggerPerFLP{triggerPerFlp}, mNTFsBuffer{nTFsBuffer} {}
42
44 {
45 mDumpCMVs = ic.options().get<bool>("dump-cmvs-flp");
46 mEnableTrigger = ic.options().get<bool>("trigger");
47 mTriggerThresholdCMV = ic.options().get<float>("trigger-threshold-cmv");
48 mTriggerThresholdMeanMax = ic.options().get<float>("trigger-threshold-cmvMeanMax");
49 mTriggerThresholdMeanMin = ic.options().get<float>("trigger-threshold-cmvMeanMin");
50 mTriggerTimebinMin = ic.options().get<int>("trigger-threshold-timebinMin");
51 mTriggerTimebinMax = ic.options().get<int>("trigger-threshold-timebinMax");
52 }
53
55 {
56 LOGP(debug, "Processing CMVs for TF {} for CRUs {} to {}", processing_helpers::getCurrentTF(pc), mCRUs.front(), mCRUs.back());
57
58 ++mCountTFsForBuffer;
59
60 // Capture heartbeatOrbit / heartbeatBC from the first TF in the buffer
61 if (mCountTFsForBuffer == 1) {
62 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
63 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
64 const uint32_t cru = hdr->subSpecification >> 7;
65 if (mFirstOrbitBC.find(cru) == mFirstOrbitBC.end()) {
66 auto orbitVec = pc.inputs().get<std::vector<uint64_t>>(ref);
67 if (!orbitVec.empty()) {
68 mFirstOrbitBC[cru] = orbitVec[0]; // packed: orbit<<32 | bc
69 }
70 }
71 }
72 }
73
74 bool triggered = false;
75 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
76 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
77 const uint32_t cru = tpcCRUHeader->subSpecification >> 7;
78 auto vecCMVs = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
79 mCMVs[cru].insert(mCMVs[cru].end(), vecCMVs.begin(), vecCMVs.end());
80
81 const bool cruTriggered = mEnableTrigger && evaluateTrigger(vecCMVs);
82 if (!mTriggerPerFLP) {
83 pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVTrigger(), tpcCRUHeader->subSpecification}, cruTriggered);
84 } else {
85 triggered |= cruTriggered;
86 }
87 }
88 if (mTriggerPerFLP) {
89 const header::DataHeader::SubSpecificationType trigSubSpec{mCRUs.front() << 7};
90 pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVTrigger(), trigSubSpec}, triggered);
91 }
92
93 if (mCountTFsForBuffer >= mNTFsBuffer) {
94 mCountTFsForBuffer = 0;
95 for (const auto cru : mCRUs) {
96 LOGP(debug, "Sending CMVs of size {} for TF {}", mCMVs[cru].size(), processing_helpers::getCurrentTF(pc));
97 sendOutput(pc.outputs(), cru);
98 }
99 mFirstOrbitBC.clear();
100 }
101
102 if (mDumpCMVs) {
103 TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE");
104 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
105 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
106 const int cru = tpcCRUHeader->subSpecification >> 7;
107 auto vec = pc.inputs().get<std::vector<uint16_t>>(ref);
108 fOut.WriteObject(&vec, fmt::format("CRU_{}", cru).data());
109 }
110 }
111 }
112
114 {
115 if (mCountTFsForBuffer > 0) {
116 LOGP(info, "Flushing remaining {} buffered TFs at end of stream", mCountTFsForBuffer);
117 for (const auto cru : mCRUs) {
118 sendOutput(ec.outputs(), cru);
119 }
120 }
121 ec.services().get<o2::framework::ControlService>().readyToQuit(o2::framework::QuitRequest::Me);
122 }
123
125
128
131
132 private:
133 const int mLane{};
134 const std::vector<uint32_t> mCRUs{};
135 int mNTFsBuffer{1};
136 bool mDumpCMVs{};
137 bool mTriggerPerFLP{false};
138 int mCountTFsForBuffer{0};
139 std::unordered_map<unsigned int, o2::pmr::vector<uint16_t>> mCMVs{};
140 std::unordered_map<uint32_t, uint64_t> mFirstOrbitBC{};
141 bool mEnableTrigger{false};
142 float mTriggerThresholdCMV{-10.f};
143 float mTriggerThresholdMeanMax{-40.f};
144 float mTriggerThresholdMeanMin{-80.f};
145 int mTriggerTimebinMin{4};
146 int mTriggerTimebinMax{-1};
147
149 const std::vector<o2::framework::InputSpec> mFilter = {{"cmvs", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CMVVECTOR"}, o2::framework::Lifetime::Timeframe}};
151 const std::vector<o2::framework::InputSpec> mOrbitFilter = {{"cmvorbits", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CMVORBITS"}, o2::framework::Lifetime::Timeframe}};
152
153 // Scan a CRU's CMV vector for contiguous below-threshold sequences.
154 // Returns true as soon as one sequence satisfies both the length and mean criteria.
155 bool evaluateTrigger(const o2::pmr::vector<uint16_t>& cmvs) const
156 {
157 float seqSum = 0.f;
158 int seqLen = 0;
159
160 auto checkSequence = [&]() -> bool {
161 if (seqLen == 0) {
162 return false;
163 }
164 const float mean = seqSum / seqLen;
165 return (seqLen >= mTriggerTimebinMin) &&
166 (mTriggerTimebinMax < 0 || seqLen <= mTriggerTimebinMax) &&
167 (mean >= mTriggerThresholdMeanMin) &&
168 (mean <= mTriggerThresholdMeanMax);
169 };
170
171 for (const auto raw : cmvs) {
172 const float val = cmv::Data{raw}.getCMVFloat();
173 if (val < mTriggerThresholdCMV) {
174 seqSum += val;
175 ++seqLen;
176 } else {
177 if (checkSequence()) {
178 return true;
179 }
180 seqLen = 0;
181 seqSum = 0.f;
182 }
183 }
184 return checkSequence(); // trailing sequence that reached end of buffer
185 }
186
187 void sendOutput(o2::framework::DataAllocator& output, const uint32_t cru)
188 {
189 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
190
191 // Forward the first-TF orbit/BC for this CRU (0 if unavailable for any reason)
192 uint64_t orbitBC = 0;
193 if (auto it = mFirstOrbitBC.find(cru); it != mFirstOrbitBC.end()) {
194 orbitBC = it->second;
195 }
197
198 output.adoptContainer(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru]));
199 }
200};
201
202o2::framework::DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const bool triggerPerFlp, const int nTFsBuffer = 1)
203{
204 std::vector<o2::framework::OutputSpec> outputSpecs;
205 std::vector<o2::framework::InputSpec> inputSpecs;
206 outputSpecs.reserve(crus.size() * 2 + 1);
207 inputSpecs.reserve(crus.size() * 2);
208
209 for (const auto& cru : crus) {
210 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
211
212 // Inputs from CMVToVectorSpec
213 inputSpecs.emplace_back(o2::framework::InputSpec{"cmvs", o2::header::gDataOriginTPC, "CMVVECTOR", subSpec, o2::framework::Lifetime::Timeframe});
214 inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbits", o2::header::gDataOriginTPC, "CMVORBITS", subSpec, o2::framework::Lifetime::Timeframe});
215
216 // Outputs to TPCDistributeCMVSpec
217 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, o2::framework::Lifetime::Sporadic);
218 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, o2::framework::Lifetime::Sporadic);
219
220 if (!triggerPerFlp) {
221 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVTrigger(), subSpec}, o2::framework::Lifetime::Timeframe);
222 }
223 }
224 if (triggerPerFlp) { // Single per-FLP trigger output, subspec keyed on the first CRU
225 const header::DataHeader::SubSpecificationType trigSubSpec{crus.front() << 7};
226 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVTrigger(), trigSubSpec}, o2::framework::Lifetime::Timeframe);
227 }
228
229 const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane);
231 id.data(),
232 inputSpecs,
233 outputSpecs,
234 o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCFLPCMVDevice>(ilane, crus, triggerPerFlp, nTFsBuffer)},
236 {"dump-cmvs-flp", o2::framework::VariantType::Bool, false, {"Dump CMVs to file"}},
237 {"trigger", o2::framework::VariantType::Bool, false, {"Enable CMV trigger evaluation"}},
238 {"trigger-threshold-cmv", o2::framework::VariantType::Float, -10.f, {"CMV threshold: sequence starts when value drops below this (ADC units)"}},
239 {"trigger-threshold-cmvMeanMax", o2::framework::VariantType::Float, -40.f, {"Upper bound on trigger-sequence mean CMV value"}},
240 {"trigger-threshold-cmvMeanMin", o2::framework::VariantType::Float, -80.f, {"Lower bound on trigger-sequence mean CMV value"}},
241 {"trigger-threshold-timebinMin", o2::framework::VariantType::Int, 4, {"Minimum trigger-sequence length in timebins"}},
242 {"trigger-threshold-timebinMax", o2::framework::VariantType::Int, -1, {"Maximum trigger-sequence length in timebins (-1 disables upper bound)"}}}};
243}
244
245} // namespace o2::tpc
246#endif
Common mode values data format definition.
std::ostringstream debug
TPCZSHDR * hdr
o2::raw::RawFileWriter * raw
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
A helper class to iteratate over all parts of all input routes.
static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo()
Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP.
static constexpr header::DataDescription getDataDescriptionCMVTrigger()
Data description for the per-CRU per-TF trigger flag (empty span = not triggered or disabled; {1} = t...
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVGroup()
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
TPCFLPCMVDevice(const int lane, const std::vector< uint32_t > &crus, const bool triggerPerFlp, const int nTFsBuffer)
void init(o2::framework::InitContext &ic) final
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLuint GLfloat * val
Definition glcorearb.h:1582
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
double mean(std::vector< double >::const_iterator first, std::vector< double >::const_iterator last)
@ Me
Only quit this data processor.
std::vector< ConfigParamSpec > Options
std::tuple< uint32_t, uint16_t > orbitBC(uint20_t bunchCrossing, uint32_t firstOrbit)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
o2::framework::DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector< uint32_t > &crus, const bool triggerPerFlp, const int nTFsBuffer=1)
uint32_t SubSpecificationType
Definition DataHeader.h:622
std::vector< o2::ctf::BufferType > vec