Project
Loading...
Searching...
No Matches
TPCDistributeCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCDISTRIBUTECMVSPEC_H
17#define O2_TPCDISTRIBUTECMVSPEC_H
18
19#include <algorithm>
20#include <array>
21#include <limits>
22#include <unordered_map>
23#include <vector>
24#include <fmt/format.h>
25#include "Framework/Task.h"
27#include "Framework/Logger.h"
31#include "Headers/DataHeader.h"
38
39using namespace o2::framework;
41using namespace o2::tpc;
42
43namespace o2::tpc
44{
45
47{
48 public:
49 TPCDistributeCMVSpec(const std::vector<uint32_t>& crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr<o2::base::GRPGeomRequest> req)
50 : mCRUs{crus},
51 mTimeFrames{timeframes},
52 mNTFsBuffer{nTFsBuffer},
53 mOutLanes{outlanes},
54 mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}},
55 mTFStart{{firstTF, firstTF + static_cast<long>(timeframes) * nTFsBuffer}},
56 mTFEnd{{firstTF + static_cast<long>(timeframes) * nTFsBuffer - 1, firstTF + 2LL * timeframes * nTFsBuffer - 1}},
57 mCCDBRequest(req),
58 mSendCCDBOutputOrbitReset(outlanes),
59 mSendCCDBOutputGRPECS(outlanes),
60 mOrbitInfoForwarded{{std::vector<bool>(timeframes, false), std::vector<bool>(timeframes, false)}}
61 {
62 mDataDescrOut.reserve(mOutLanes);
63 mOrbitDescrOut.reserve(mOutLanes);
64 for (unsigned int i = 0; i < mOutLanes; ++i) {
65 mDataDescrOut.emplace_back(getDataDescriptionCMV(i));
66 mOrbitDescrOut.emplace_back(getDataDescriptionCMVOrbitInfo(i));
67 }
68 // sort vector for binary_search
69 std::sort(mCRUs.begin(), mCRUs.end());
70
71 for (auto& processedCRUbuffer : mProcessedCRUs) {
72 processedCRUbuffer.resize(mTimeFrames);
73 for (auto& crusMap : processedCRUbuffer) {
74 crusMap.reserve(mCRUs.size());
75 for (const auto cruID : mCRUs) {
76 crusMap.emplace(cruID, false);
77 }
78 }
79 }
80
81 mFilter.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic});
82 mOrbitFilter.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic});
83 }
84
86 {
88 mNFactorTFs = ic.options().get<int>("nFactorTFs");
89 mNTFsDataDrop = ic.options().get<int>("drop-data-after-nTFs");
90 mCheckEveryNData = ic.options().get<int>("check-data-every-n");
91 if (mCheckEveryNData == 0) {
92 mCheckEveryNData = mTimeFrames / 2;
93 if (mCheckEveryNData == 0) {
94 mCheckEveryNData = 1;
95 }
96 mNTFsDataDrop = mCheckEveryNData;
97 }
98 }
99
100 void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj) final
101 {
103 if (matcher == ConcreteDataMatcher("CTP", "ORBITRESET", 0)) {
104 LOGP(debug, "Updating ORBITRESET");
105 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true);
106 } else if (matcher == ConcreteDataMatcher("GLO", "GRPECS", 0)) {
107 // check if received object is valid
108 if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) {
109 LOGP(debug, "Updating GRPECS");
110 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true);
111 } else {
112 LOGP(debug, "Detected default GRPECS object");
113 }
114 }
115 }
116
118 {
119 // capture orbit-reset info once for precise CCDB timestamp calculation
120 if (mCCDBRequest->askTime) {
121 const bool grpecsValid = pc.inputs().isValid("grpecs");
122 const bool orbitResetValid = pc.inputs().isValid("orbitReset");
123 if (grpecsValid) {
124 pc.inputs().get<o2::parameters::GRPECSObject*>("grpecs");
125 }
126 if (orbitResetValid) {
127 pc.inputs().get<std::vector<Long64_t>*>("orbitReset");
128 }
129 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
130 return;
131 }
132 }
133
134 const auto tf = processing_helpers::getCurrentTF(pc);
135 if (tf == std::numeric_limits<uint32_t>::max()) {
136 forwardEOSData(pc);
137 return;
138 }
139
140 // automatically detect firstTF in case firstTF was not specified
141 if (mTFStart.front() <= -1) {
142 const auto firstTFDetected = tf;
143 const long offsetTF = std::abs(mTFStart.front() + 1);
144 const auto nTotTFs = getNRealTFs();
145 // tf is the batch TF counter (= last real TF in the first batch), subtract (mNTFsBuffer - 1) to recover the actual first real TF of the interval
146 const long firstRealTF = static_cast<long>(firstTFDetected) - (mNTFsBuffer - 1) + offsetTF;
147 mTFStart = {firstRealTF, firstRealTF + nTotTFs};
148 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
149 LOGP(detail, "Setting {} as first TF", mTFStart[0]);
150 LOGP(detail, "Using offset of {} TFs for setting the first TF", offsetTF);
151 }
152
153 // check which buffer to use for current incoming data
154 const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
155 if (mTFStart[currentBuffer] > tf) {
156 LOGP(detail, "All CRUs for current TF {} already received. Skipping this TF", tf);
157 return;
158 }
159
160 const unsigned int currentOutLane = getOutLane(tf);
161 const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer;
162 LOGP(debug, "Current TF: {}, relative TF: {}, current buffer: {}, current output lane: {}, mTFStart: {}", tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]);
163
164 if (relTF >= mProcessedCRU[currentBuffer].size()) {
165 LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size());
166 // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received
167 mProcessedTotalData = mCheckEveryNData;
168 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf);
169 return;
170 }
171
172 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
173 return;
174 }
175
176 if (mSendOutputStartInfo[currentBuffer]) {
177 mSendOutputStartInfo[currentBuffer] = false;
178 pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]);
179 }
180
181 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
182 mSendCCDBOutputOrbitReset[currentOutLane] = false;
183 mSendCCDBOutputGRPECS[currentOutLane] = false;
184 pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{currentOutLane}}, dataformats::Pair<long, int>{o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS(), o2::base::GRPGeomHelper::instance().getNHBFPerTF()});
185 }
186
187 forwardOrbitInfo(pc, currentBuffer, relTF, currentOutLane);
188
189 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
190 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
191 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
192
193 // check if cru is specified in input cru list
194 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
195 LOGP(debug, "Received data from CRU: {} which was not specified as input. Skipping", cru);
196 continue;
197 }
198
199 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
200 continue;
201 }
202 // count total number of processed CRUs for given TF
203 ++mProcessedCRU[currentBuffer][relTF];
204 // to keep track of processed CRUs
205 mProcessedCRUs[currentBuffer][relTF][cru] = true;
206
207 sendOutput(pc, currentOutLane, cru, pc.inputs().get<pmr::vector<uint16_t>>(ref));
208 }
209
210 LOGP(detail, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf);
211
212 // check for missing data if specified
213 if (mNTFsDataDrop > 0) {
214 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf);
215 }
216
217 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
218 ++mProcessedTFs[currentBuffer];
219 }
220
221 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
222 finishInterval(pc, currentOutLane, currentBuffer, tf);
223 }
224 }
225
226 void endOfStream(o2::framework::EndOfStreamContext& ec) final { ec.services().get<ControlService>().readyToQuit(QuitRequest::Me); }
227
229 static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
230 {
231 const std::string name = fmt::format("CMVAGG{}", lane);
233 description.runtimeInit(name.substr(0, 16).c_str());
234 return description;
235 }
236
239 {
240 const std::string name = fmt::format("CMVORB{}", lane);
242 description.runtimeInit(name.substr(0, 16).c_str());
243 return description;
244 }
245
248
249 private:
250 std::vector<uint32_t> mCRUs{};
251 const unsigned int mTimeFrames{};
252 const int mNTFsBuffer{1};
253 const unsigned int mOutLanes{};
254 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
255 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
256 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
257 std::array<long, 2> mTFStart{};
258 std::array<long, 2> mTFEnd{};
259 std::array<bool, 2> mSendOutputStartInfo{true, true};
260 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
261 std::vector<bool> mSendCCDBOutputOrbitReset{};
262 std::vector<bool> mSendCCDBOutputGRPECS{};
263 unsigned int mCurrentOutLane{0};
264 bool mBuffer{false};
265 int mNFactorTFs{0};
266 int mNTFsDataDrop{0};
267 std::array<int, 2> mStartNTFsDataDrop{0};
268 long mProcessedTotalData{0};
269 int mCheckEveryNData{1};
270 std::vector<InputSpec> mFilter{};
271 std::vector<InputSpec> mOrbitFilter{};
272 std::vector<header::DataDescription> mDataDescrOut{};
273 std::vector<header::DataDescription> mOrbitDescrOut{};
274 std::array<std::vector<bool>, 2> mOrbitInfoForwarded{};
275
277 unsigned int getOutLane(const uint32_t tf) const { return (tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; }
279 unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; }
280
281 void sendOutput(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const unsigned int cru, o2::pmr::vector<uint16_t> cmvs)
282 {
283 pc.outputs().adoptContainer(Output{gDataOriginTPC, mDataDescrOut[currentOutLane], header::DataHeader::SubSpecificationType{cru}}, std::move(cmvs));
284 }
285
286 void sendOrbitInfo(o2::framework::ProcessingContext& pc, const unsigned int outLane, const uint64_t orbitInfo)
287 {
288 pc.outputs().snapshot(Output{gDataOriginTPC, mOrbitDescrOut[outLane], header::DataHeader::SubSpecificationType{outLane}}, orbitInfo);
289 }
290
291 void forwardOrbitInfo(o2::framework::ProcessingContext& pc, const bool currentBuffer, const unsigned int relTF, const unsigned int currentOutLane)
292 {
293 if (mOrbitInfoForwarded[currentBuffer][relTF]) {
294 return;
295 }
296
297 for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
298 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
299 const unsigned int cru = hdr->subSpecification >> 7;
300 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
301 continue;
302 }
303
304 sendOrbitInfo(pc, currentOutLane, pc.inputs().get<uint64_t>(ref));
305 mOrbitInfoForwarded[currentBuffer][relTF] = true;
306 break;
307 }
308 }
309
310 void forwardEOSData(o2::framework::ProcessingContext& pc)
311 {
312 const unsigned int currentOutLane = mCurrentOutLane;
313
314 if (mSendOutputStartInfo[mBuffer] && (mTFStart[mBuffer] >= 0)) {
315 mSendOutputStartInfo[mBuffer] = false;
316 pc.outputs().snapshot(Output{gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[mBuffer]);
317 }
318
319 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
320 mSendCCDBOutputOrbitReset[currentOutLane] = false;
321 mSendCCDBOutputGRPECS[currentOutLane] = false;
323 }
324
325 if (!mOrbitInfoForwarded[mBuffer].empty()) {
326 for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
327 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
328 const unsigned int cru = hdr->subSpecification >> 7;
329 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
330 continue;
331 }
332 sendOrbitInfo(pc, currentOutLane, pc.inputs().get<uint64_t>(ref));
333 break;
334 }
335 }
336
337 for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
338 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
339 const unsigned int cru = hdr->subSpecification >> 7;
340 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
341 continue;
342 }
343 sendOutput(pc, currentOutLane, cru, pc.inputs().get<pmr::vector<uint16_t>>(ref));
344 }
345 }
346
347 void clearBuffer(const bool currentBuffer)
348 {
349 // reset per-CRU received flags so the next interval can accept data from all CRUs again
350 for (auto& crusMap : mProcessedCRUs[currentBuffer]) {
351 for (auto& it : crusMap) {
352 it.second = false;
353 }
354 }
355
356 mProcessedTFs[currentBuffer] = 0;
357 std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0);
358 std::fill(mOrbitInfoForwarded[currentBuffer].begin(), mOrbitInfoForwarded[currentBuffer].end(), false);
359
360 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
361 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
362
363 // switch buffer and advance output lane
364 mBuffer = !mBuffer;
365 mCurrentOutLane = ++mCurrentOutLane % mOutLanes;
366 }
367
368 void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const unsigned int currentOutLane, const uint32_t tf)
369 {
370 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
371 LOGP(detail, "Checking for dropped packages...");
372
373 // if the last buffer has a smaller time range than expected, flush its remaining uncompleted TFs
374 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
375 LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
376 const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1);
377 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size(), lastLane);
378 LOGP(detail, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf);
379 finishInterval(pc, lastLane, !currentBuffer, tf);
380 }
381
382 const int tfEndCheck = std::clamp(static_cast<int>(relTF) - mNTFsDataDrop, 0, static_cast<int>(mProcessedCRU[currentBuffer].size()));
383 LOGP(detail, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
384 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane);
385 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
386 }
387 }
388
389 void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF, const unsigned int outLane)
390 {
391 for (int iTF = startTF; iTF < endTF; ++iTF) {
392 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
393 LOGP(warning, "CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] + static_cast<long>(iTF) * mNTFsBuffer, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
394 ++mProcessedTFs[currentBuffer];
395 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
396
397 // send empty payloads for missing CRUs so the aggregate lane sees a complete set
398 for (auto& it : mProcessedCRUs[currentBuffer][iTF]) {
399 if (!it.second) {
400 it.second = true;
401 sendOutput(pc, outLane, it.first, pmr::vector<uint16_t>());
402 }
403 }
404
405 // send zero orbit placeholder for missing TF so the aggregate lane can still reconstruct timing
406 if (!mOrbitInfoForwarded[currentBuffer][iTF]) {
407 sendOrbitInfo(pc, outLane, 0);
408 mOrbitInfoForwarded[currentBuffer][iTF] = true;
409 }
410 }
411 }
412 }
413
414 void finishInterval(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const bool buffer, const uint32_t tf)
415 {
416 if (mNFactorTFs > 0) {
417 mNFactorTFs = 0;
418 // ToDo: Find better fix. Set oldestForChannel to a very large value so the DPL dispatcher does not block waiting for older TF data that will never arrive
419 for (unsigned int ilane = 0; ilane < mOutLanes; ++ilane) {
420 auto& deviceProxy = pc.services().get<FairMQDeviceProxy>();
421 auto& state = deviceProxy.getOutputChannelState({static_cast<int>(ilane)});
422 size_t oldest = std::numeric_limits<size_t>::max() - 1;
423 state.oldestForChannel = {oldest};
424 }
425 }
426
427 LOGP(detail, "All TFs {} for current buffer received. Clearing buffer", tf);
428 clearBuffer(buffer);
429 mStartNTFsDataDrop[buffer] = 0;
430 mSendOutputStartInfo[buffer] = true;
431 }
432};
433
434DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1)
435{
436 std::vector<InputSpec> inputSpecs;
437 inputSpecs.emplace_back(InputSpec{"cmvsgroup", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, Lifetime::Sporadic});
438 inputSpecs.emplace_back(InputSpec{"cmvorbit", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, Lifetime::Sporadic});
439
440 std::vector<OutputSpec> outputSpecs;
441 outputSpecs.reserve(3 * outlanes);
442 for (unsigned int lane = 0; lane < outlanes; ++lane) {
443 outputSpecs.emplace_back(ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, Lifetime::Sporadic);
444 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic);
445 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic);
446 }
447
448 // Only lane 0 fetches CCDB orbit-reset/GRPECS objects and broadcasts them to all aggregate lanes, the other distribute lanes do not need them, avoiding redundant CCDB requests
449 bool fetchCCDB = false;
450 if (sendPrecisetimeStamp && (ilane == 0)) {
451 fetchCCDB = true;
452 for (unsigned int lane = 0; lane < outlanes; ++lane) {
453 outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{lane}}, Lifetime::Sporadic);
454 }
455 }
456
457 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB, // orbitResetTime
458 fetchCCDB, // GRPECS=true
459 false, // GRPLHCIF
460 false, // GRPMagField
461 false, // askMatLUT
463 inputSpecs);
464
465 const auto id = fmt::format("tpc-distribute-cmv-{:02}", ilane);
467 id.data(),
468 inputSpecs,
469 outputSpecs,
470 AlgorithmSpec{adaptFromTask<TPCDistributeCMVSpec>(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)},
471 Options{{"drop-data-after-nTFs", VariantType::Int, 0, {"Number of TFs after which to drop the data."}},
472 {"check-data-every-n", VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}},
473 {"nFactorTFs", VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}};
474 spec.rank = ilane;
475 return spec;
476}
477
478} // namespace o2::tpc
479
480#endif
header::DataDescription description
benchmark::State & state
std::ostringstream debug
int32_t i
TPCZSHDR * hdr
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
TPC device for processing CMVs on FLPs.
auto getOrbitResetTimeMS() const
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void snapshot(const Output &spec, T const &object)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr header::DataDescription getDataDescriptionCMVFirstTF()
void init(o2::framework::InitContext &ic) final
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj) final
static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
Return data description for aggregated CMVs for a given lane.
static constexpr header::DataDescription getDataDescriptionCMVOrbitReset()
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
TPCDistributeCMVSpec(const std::vector< uint32_t > &crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr< o2::base::GRPGeomRequest > req)
static header::DataDescription getDataDescriptionCMVOrbitInfo(const unsigned int lane)
Return data description for orbit/BC info for a given output lane.
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo()
Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP.
static constexpr header::DataDescription getDataDescriptionCMVGroup()
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< ConfigParamSpec > Options
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp=false, const int nTFsBuffer=1)
Enum< T >::Iterator begin(Enum< T >)
Definition Defs.h:156
void empty(int)
std::unique_ptr< GPUReconstructionTimeframe > tf
uint32_t SubSpecificationType
Definition DataHeader.h:622
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261