Project
Loading...
Searching...
No Matches
TPCFLPCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCFLPIDCSPEC_H
17#define O2_TPCFLPIDCSPEC_H
18
19#include <vector>
20#include <unordered_map>
21#include <fmt/format.h>
22#include "Framework/Task.h"
24#include "Framework/Logger.h"
28#include "Headers/DataHeader.h"
30#include "TPCBase/CRU.h"
31#include "TFile.h"
32
33using namespace o2::framework;
35using namespace o2::tpc;
36
37namespace o2::tpc
38{
39
41{
42 public:
43 TPCFLPCMVDevice(const int lane, const std::vector<uint32_t>& crus, const int nTFsBuffer)
44 : mLane{lane}, mCRUs{crus}, mNTFsBuffer{nTFsBuffer} {}
45
47 {
48 mDumpCMVs = ic.options().get<bool>("dump-cmvs-flp");
49 }
50
52 {
53 LOGP(debug, "Processing CMVs for TF {} for CRUs {} to {}", processing_helpers::getCurrentTF(pc), mCRUs.front(), mCRUs.back());
54
55 ++mCountTFsForBuffer;
56
57 // Capture heartbeatOrbit / heartbeatBC from the first TF in the buffer
58 if (mCountTFsForBuffer == 1) {
59 for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
60 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
61 const uint32_t cru = hdr->subSpecification >> 7;
62 if (mFirstOrbitBC.find(cru) == mFirstOrbitBC.end()) {
63 auto orbitVec = pc.inputs().get<std::vector<uint64_t>>(ref);
64 if (!orbitVec.empty()) {
65 mFirstOrbitBC[cru] = orbitVec[0]; // packed: orbit<<32 | bc
66 }
67 }
68 }
69 }
70
71 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
72 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
73 const int cru = tpcCRUHeader->subSpecification >> 7;
74 auto vecCMVs = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
75 mCMVs[cru].insert(mCMVs[cru].end(), vecCMVs.begin(), vecCMVs.end());
76 }
77
78 if (mCountTFsForBuffer >= mNTFsBuffer) {
79 mCountTFsForBuffer = 0;
80 for (const auto cru : mCRUs) {
81 LOGP(debug, "Sending CMVs of size {} for TF {}", mCMVs[cru].size(), processing_helpers::getCurrentTF(pc));
82 sendOutput(pc.outputs(), cru);
83 }
84 mFirstOrbitBC.clear();
85 }
86
87 if (mDumpCMVs) {
88 TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE");
89 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
90 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
91 const int cru = tpcCRUHeader->subSpecification >> 7;
92 auto vec = pc.inputs().get<std::vector<uint16_t>>(ref);
93 fOut.WriteObject(&vec, fmt::format("CRU_{}", cru).data());
94 }
95 }
96 }
97
99 {
100 if (mCountTFsForBuffer > 0) {
101 LOGP(info, "Flushing remaining {} buffered TFs at end of stream", mCountTFsForBuffer);
102 for (const auto cru : mCRUs) {
103 sendOutput(ec.outputs(), cru);
104 }
105 }
106 ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
107 }
108
110
113
114 private:
115 const int mLane{};
116 const std::vector<uint32_t> mCRUs{};
117 int mNTFsBuffer{1};
118 bool mDumpCMVs{};
119 int mCountTFsForBuffer{0};
120 std::unordered_map<unsigned int, o2::pmr::vector<uint16_t>> mCMVs{};
121 std::unordered_map<uint32_t, uint64_t> mFirstOrbitBC{};
122
124 const std::vector<InputSpec> mFilter = {{"cmvs", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVVECTOR"}, Lifetime::Timeframe}};
126 const std::vector<InputSpec> mOrbitFilter = {{"cmvorbits", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVORBITS"}, Lifetime::Timeframe}};
127
128 void sendOutput(DataAllocator& output, const uint32_t cru)
129 {
130 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
131
132 // Forward the first-TF orbit/BC for this CRU (0 if unavailable for any reason)
133 uint64_t orbitBC = 0;
134 if (auto it = mFirstOrbitBC.find(cru); it != mFirstOrbitBC.end()) {
135 orbitBC = it->second;
136 }
138
139 output.adoptContainer(Output{gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru]));
140 }
141};
142
143DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const int nTFsBuffer = 1)
144{
145 std::vector<OutputSpec> outputSpecs;
146 std::vector<InputSpec> inputSpecs;
147 outputSpecs.reserve(crus.size());
148 inputSpecs.reserve(crus.size());
149
150 for (const auto& cru : crus) {
151 const header::DataHeader::SubSpecificationType subSpec{cru << 7};
152
153 // Inputs from CMVToVectorSpec
154 inputSpecs.emplace_back(InputSpec{"cmvs", gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe});
155 inputSpecs.emplace_back(InputSpec{"cmvorbits", gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe});
156
157 // Outputs to TPCDistributeCMVSpec
158 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, Lifetime::Sporadic);
159 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, Lifetime::Sporadic);
160 }
161
162 const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane);
163 return DataProcessorSpec{
164 id.data(),
165 inputSpecs,
166 outputSpecs,
167 AlgorithmSpec{adaptFromTask<TPCFLPCMVDevice>(ilane, crus, nTFsBuffer)},
168 Options{{"dump-cmvs-flp", VariantType::Bool, false, {"Dump CMVs to file"}}}};
169}
170
171} // namespace o2::tpc
172#endif
std::ostringstream debug
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
A helper class to iteratate over all parts of all input routes.
static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo()
Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP.
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVGroup()
TPCFLPCMVDevice(const int lane, const std::vector< uint32_t > &crus, const int nTFsBuffer)
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(o2::framework::InitContext &ic) final
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLint ref
Definition glcorearb.h:291
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::tuple< uint32_t, uint16_t > orbitBC(uint20_t bunchCrossing, uint32_t firstOrbit)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector< uint32_t > &crus, const int nTFsBuffer=1)
uint32_t SubSpecificationType
Definition DataHeader.h:622
std::vector< o2::ctf::BufferType > vec