Project
Loading...
Searching...
No Matches
TPCDistributeIDCSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
17#ifndef O2_TPCDISTRIBUTEIDCIDCSPEC_H
18#define O2_TPCDISTRIBUTEIDCIDCSPEC_H
19
20#include <vector>
21#include "Framework/Task.h"
23#include "Framework/Logger.h"
26#include "Headers/DataHeader.h"
29#include "TPCBase/CRU.h"
34
35using namespace o2::framework;
37using namespace o2::tpc;
38
39namespace o2::tpc
40{
41
43{
44 public:
45 TPCDistributeIDCSpec(const std::vector<uint32_t>& crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr<o2::base::GRPGeomRequest> req)
46 : mCRUs{crus}, mTimeFrames{timeframes}, mNTFsBuffer{nTFsBuffer}, mOutLanes{outlanes}, mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}}, mTFStart{{firstTF, firstTF + timeframes}}, mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}}, mCCDBRequest(req), mSendCCDBOutputOrbitReset(outlanes), mSendCCDBOutputGRPECS(outlanes)
47 {
48 // pre calculate data description for output
49 mDataDescrOut.reserve(mOutLanes);
50 for (unsigned int i = 0; i < mOutLanes; ++i) {
51 mDataDescrOut.emplace_back(getDataDescriptionIDC(i));
52 }
53
54 // sort vector for binary_search
55 std::sort(mCRUs.begin(), mCRUs.end());
56
57 for (auto& processedCRUbuffer : mProcessedCRUs) {
58 processedCRUbuffer.resize(mTimeFrames);
59 for (auto& crusMap : processedCRUbuffer) {
60 crusMap.reserve(mCRUs.size());
61 for (const auto cruID : mCRUs) {
62 crusMap.emplace(cruID, false);
63 }
64 }
65 }
66
67 const auto sides = IDCFactorization::getSides(mCRUs);
68 for (auto side : sides) {
69 const std::string name = (side == Side::A) ? "idcsgroupa" : "idcsgroupc";
71 }
72 };
73
75 {
77 mNFactorTFs = ic.options().get<int>("nFactorTFs");
78 mNTFsDataDrop = ic.options().get<int>("drop-data-after-nTFs");
79 mCheckEveryNData = ic.options().get<int>("check-data-every-n");
80 if (mCheckEveryNData == 0) {
81 mCheckEveryNData = mTimeFrames / 2;
82 if (mCheckEveryNData == 0) {
83 mCheckEveryNData = 1;
84 }
85 mNTFsDataDrop = mCheckEveryNData;
86 }
87 }
88
89 void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
90 {
92 if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) {
93 LOGP(info, "Updating ORBITRESET");
94 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true);
95 } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) {
96 // check if received object is valid
97 if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) {
98 LOGP(info, "Updating GRPECS");
99 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true);
100 } else {
101 LOGP(info, "Detected default GRPECS object");
102 }
103 }
104 }
105
107 {
108 // send orbit reset and orbits per TF only once
109 if (mCCDBRequest->askTime) {
110 const bool grpecsValid = pc.inputs().isValid("grpecs");
111 const bool orbitResetValid = pc.inputs().isValid("orbitReset");
112 if (grpecsValid) {
113 pc.inputs().get<o2::parameters::GRPECSObject*>("grpecs");
114 }
115 if (orbitResetValid) {
116 pc.inputs().get<std::vector<Long64_t>*>("orbitReset");
117 }
118 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
119 return;
120 }
121 }
122
123 const auto tf = processing_helpers::getCurrentTF(pc);
124
125 // automatically detect firstTF in case firstTF was not specified
126 if (mTFStart.front() <= -1) {
127 const auto firstTF = tf;
128 const long offsetTF = std::abs(mTFStart.front() + 1);
129 const auto nTotTFs = getNRealTFs();
130 mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs};
131 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
132 LOGP(info, "Setting {} as first TF", mTFStart[0]);
133 LOGP(info, "Using offset of {} TFs for setting the first TF", offsetTF);
134 }
135
136 // check which buffer to use for current incoming data
137 const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
138 if (mTFStart[currentBuffer] > tf) {
139 LOGP(info, "all CRUs for current TF {} already received. Skipping this TF", tf);
140 return;
141 }
142
143 const unsigned int currentOutLane = getOutLane(tf);
144 const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer;
145 LOGP(debug, "current TF: {} relative TF: {} current buffer: {} current output lane: {} mTFStart: {}", tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]);
146
147 if (relTF >= mProcessedCRU[currentBuffer].size()) {
148 LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size());
149
150 // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received
151 mProcessedTotalData = mCheckEveryNData;
152 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf);
153 return;
154 }
155
156 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
157 return;
158 }
159
160 // send start info only once
161 if (mSendOutputStartInfo[currentBuffer]) {
162 mSendOutputStartInfo[currentBuffer] = false;
163 pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionIDCFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]);
164 }
165
166 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
167 mSendCCDBOutputOrbitReset[currentOutLane] = false;
168 mSendCCDBOutputGRPECS[currentOutLane] = false;
169 pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionIDCOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair<long, int>{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()});
170 }
171
172 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
173 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
174 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
175
176 // check if cru is specified in input cru list
177 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
178 LOGP(debug, "Received data from CRU: {} which was not specified as input. Skipping", cru);
179 continue;
180 }
181
182 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
183 continue;
184 } else {
185 // count total number of processed CRUs for given TF
186 ++mProcessedCRU[currentBuffer][relTF];
187
188 // to keep track of processed CRUs
189 mProcessedCRUs[currentBuffer][relTF][cru] = true;
190 }
191
192 // sending IDCs
193 sendOutput(pc, currentOutLane, cru, pc.inputs().get<pmr::vector<float>>(ref));
194 }
195
196 LOGP(info, "number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf);
197
198 // check for missing data if specified
199 if (mNTFsDataDrop > 0) {
200 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf);
201 }
202
203 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
204 ++mProcessedTFs[currentBuffer];
205 }
206
207 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
208 finishInterval(pc, currentOutLane, currentBuffer, tf);
209 }
210 }
211
212 void endOfStream(o2::framework::EndOfStreamContext& ec) final { ec.services().get<ControlService>().readyToQuit(QuitRequest::Me); }
213
215 static header::DataDescription getDataDescriptionIDC(const unsigned int lane)
216 {
217 const std::string name = fmt::format("IDCAGG{}", lane).data();
218 header::DataDescription description;
219 description.runtimeInit(name.substr(0, 16).c_str());
220 return description;
221 }
222
225
226 private:
227 std::vector<uint32_t> mCRUs{};
228 const unsigned int mTimeFrames{};
229 const int mNTFsBuffer{1};
230 const unsigned int mOutLanes{};
231 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
232 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
233 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
234 std::array<long, 2> mTFStart{};
235 std::array<long, 2> mTFEnd{};
236 std::array<bool, 2> mSendOutputStartInfo{true, true};
237 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
238 std::vector<bool> mSendCCDBOutputOrbitReset{};
239 std::vector<bool> mSendCCDBOutputGRPECS{};
240 unsigned int mCurrentOutLane{0};
241 bool mBuffer{false};
242 int mNFactorTFs{0};
243 int mNTFsDataDrop{0};
244 std::array<int, 2> mStartNTFsDataDrop{0};
245 long mProcessedTotalData{0};
246 int mCheckEveryNData{1};
247 std::vector<InputSpec> mFilter{};
248 std::vector<header::DataDescription> mDataDescrOut{};
249
250 void sendOutput(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const unsigned int cru, o2::pmr::vector<float> idcs)
251 {
252 pc.outputs().adoptContainer(Output{gDataOriginTPC, mDataDescrOut[currentOutLane], header::DataHeader::SubSpecificationType{cru}}, std::move(idcs));
253 }
254
256 unsigned int getOutLane(const uint32_t tf) const { return (tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; }
257
259 unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; }
260
261 void clearBuffer(const bool currentBuffer)
262 {
263 // resetting received CRUs
264 for (auto& crusMap : mProcessedCRUs[currentBuffer]) {
265 for (auto& it : crusMap) {
266 it.second = false;
267 }
268 }
269
270 mProcessedTFs[currentBuffer] = 0; // reset processed TFs for next aggregation interval
271 std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0);
272
273 // set integration range for next integration interval
274 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
275 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
276
277 // switch buffer
278 mBuffer = !mBuffer;
279
280 // set output lane
281 mCurrentOutLane = ++mCurrentOutLane % mOutLanes;
282 }
283
284 void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const unsigned int currentOutLane, const uint32_t tf)
285 {
286 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
287 LOGP(info, "Checking for dropped packages...");
288
289 // if last buffer has smaller time range check the whole last buffer
290 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
291 LOGP(warning, "checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
292 const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1);
293 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size(), lastLane);
294 LOGP(info, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf);
295 finishInterval(pc, lastLane, !currentBuffer, tf);
296 }
297
298 const int tfEndCheck = std::clamp(static_cast<int>(relTF) - mNTFsDataDrop, 0, static_cast<int>(mProcessedCRU[currentBuffer].size()));
299 LOGP(info, "checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
300 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane);
301 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
302 }
303 }
304
305 void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF, const unsigned int outLane)
306 {
307 for (int iTF = startTF; iTF < endTF; ++iTF) {
308 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
309 LOGP(warning, "CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
310 ++mProcessedTFs[currentBuffer];
311 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
312
313 // find missing CRUs
314 for (auto& it : mProcessedCRUs[currentBuffer][iTF]) {
315 if (!it.second) {
316 it.second = true;
317 sendOutput(pc, outLane, it.first, pmr::vector<float>());
318 }
319 }
320 }
321 }
322 }
323
324 void finishInterval(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const bool buffer, const uint32_t tf)
325 {
326 if (mNFactorTFs > 0) {
327 mNFactorTFs = 0;
328 // ToDo: Find better fix
329 for (unsigned int ilane = 0; ilane < mOutLanes; ++ilane) {
330 auto& deviceProxy = pc.services().get<FairMQDeviceProxy>();
331 auto& state = deviceProxy.getOutputChannelState({static_cast<int>(ilane)});
332 size_t oldest = std::numeric_limits<size_t>::max() - 1; // just set to really large value
333 state.oldestForChannel = {oldest};
334 }
335 }
336
337 LOGP(info, "All TFs {} for current buffer received. Clearing buffer", tf);
338 clearBuffer(buffer);
339 mStartNTFsDataDrop[buffer] = 0;
340 mSendOutputStartInfo[buffer] = true;
341 }
342};
343
344DataProcessorSpec getTPCDistributeIDCSpec(const int ilane, const std::vector<uint32_t>& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1)
345{
346 std::vector<InputSpec> inputSpecs;
347 const auto sides = IDCFactorization::getSides(crus);
348 for (auto side : sides) {
349 const std::string name = (side == Side::A) ? "idcsgroupa" : "idcsgroupc";
350 inputSpecs.emplace_back(InputSpec{name.data(), ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPIDCDevice::getDataDescriptionIDCGroup(side)}, Lifetime::Sporadic});
351 }
352
353 std::vector<OutputSpec> outputSpecs;
354 outputSpecs.reserve(outlanes);
355 for (unsigned int lane = 0; lane < outlanes; ++lane) {
356 outputSpecs.emplace_back(ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeIDCSpec::getDataDescriptionIDC(lane)}, Lifetime::Sporadic);
357 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeIDCSpec::getDataDescriptionIDCFirstTF(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic);
358 }
359
360 bool fetchCCDB = false;
361 if (sendPrecisetimeStamp && (ilane == 0)) {
362 fetchCCDB = true;
363 for (unsigned int lane = 0; lane < outlanes; ++lane) {
364 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeIDCSpec::getDataDescriptionIDCOrbitReset(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic);
365 }
366 }
367
368 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB, // orbitResetTime
369 fetchCCDB, // GRPECS=true
370 false, // GRPLHCIF
371 false, // GRPMagField
372 false, // askMatLUT
374 inputSpecs);
375
376 const std::string type = "idc";
377 const auto id = fmt::format("tpc-distribute-{}-{:02}", type, ilane);
379 id.data(),
380 inputSpecs,
381 outputSpecs,
382 AlgorithmSpec{adaptFromTask<TPCDistributeIDCSpec>(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)},
383 Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data."}},
384 {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}},
385 {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}}; // end DataProcessorSpec
386 spec.rank = ilane;
387 return spec;
388}
389
390} // namespace o2::tpc
391
392#endif
benchmark::State & state
int32_t i
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
uint32_t side
Definition RawData.h:0
TPC device for processing on FLPs.
std::ostringstream debug
auto getOrbitResetTimeMS() const
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
A helper class to iteratate over all parts of all input routes.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
const std::vector< Side > & getSides() const
static constexpr header::DataDescription getDataDescriptionIDCFirstTF()
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
static constexpr header::DataDescription getDataDescriptionIDCOrbitReset()
TPCDistributeIDCSpec(const std::vector< uint32_t > &crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr< o2::base::GRPGeomRequest > req)
static header::DataDescription getDataDescriptionIDC(const unsigned int lane)
return data description for aggregated IDCs for given lane
static constexpr header::DataDescription getDataDescriptionIDCGroup(const Side side)
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:167
Enum< T >::Iterator begin(Enum< T >)
Definition Defs.h:173
@ A
Definition Defs.h:35
DataProcessorSpec getTPCDistributeIDCSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp=false, const int nTFsBuffer=1)
std::unique_ptr< GPUReconstructionTimeframe > tf
uint32_t SubSpecificationType
Definition DataHeader.h:620
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261