Project
Loading...
Searching...
No Matches
TPCDistributeCMVSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#ifndef O2_TPCDISTRIBUTECMVSPEC_H
17#define O2_TPCDISTRIBUTECMVSPEC_H
18
19#include <algorithm>
20#include <array>
21#include <limits>
22#include <unordered_map>
23#include <vector>
24#include <fmt/format.h>
25#include "Framework/Task.h"
27#include "Framework/Logger.h"
31#include "Headers/DataHeader.h"
38
39namespace o2::tpc
40{
41
43{
44 public:
45 TPCDistributeCMVSpec(const std::vector<uint32_t>& crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr<o2::base::GRPGeomRequest> req)
46 : mCRUs{crus},
47 mTimeFrames{timeframes},
48 mNTFsBuffer{nTFsBuffer},
49 mOutLanes{outlanes},
50 mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}},
51 mTFStart{{firstTF, firstTF + static_cast<long>(timeframes) * nTFsBuffer}},
52 mTFEnd{{firstTF + static_cast<long>(timeframes) * nTFsBuffer - 1, firstTF + 2LL * timeframes * nTFsBuffer - 1}},
53 mCCDBRequest(req),
54 mSendCCDBOutputOrbitReset(outlanes),
55 mSendCCDBOutputGRPECS(outlanes),
56 mOrbitInfoForwarded{{std::vector<bool>(timeframes, false), std::vector<bool>(timeframes, false)}}
57 {
58 mDataDescrOut.reserve(mOutLanes);
59 mOrbitDescrOut.reserve(mOutLanes);
60 for (unsigned int i = 0; i < mOutLanes; ++i) {
61 mDataDescrOut.emplace_back(getDataDescriptionCMV(i));
62 mOrbitDescrOut.emplace_back(getDataDescriptionCMVOrbitInfo(i));
63 }
64 // sort vector for binary_search
65 std::sort(mCRUs.begin(), mCRUs.end());
66
67 for (auto& processedCRUbuffer : mProcessedCRUs) {
68 processedCRUbuffer.resize(mTimeFrames);
69 for (auto& crusMap : processedCRUbuffer) {
70 crusMap.reserve(mCRUs.size());
71 for (const auto cruID : mCRUs) {
72 crusMap.emplace(cruID, false);
73 }
74 }
75 }
76
79 }
80
82 {
84 mNFactorTFs = ic.options().get<int>("nFactorTFs");
85 mNTFsDataDrop = ic.options().get<int>("drop-data-after-nTFs");
86 mCheckEveryNData = ic.options().get<int>("check-data-every-n");
87 if (mCheckEveryNData == 0) {
88 mCheckEveryNData = mTimeFrames / 2;
89 if (mCheckEveryNData == 0) {
90 mCheckEveryNData = 1;
91 }
92 mNTFsDataDrop = mCheckEveryNData;
93 }
94 }
95
96 void finaliseCCDB(o2::framework::ConcreteDataMatcher& matcher, void* obj) final
97 {
99 if (matcher == o2::framework::ConcreteDataMatcher("CTP", "ORBITRESET", 0)) {
100 LOGP(debug, "Updating ORBITRESET");
101 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(), true);
102 } else if (matcher == o2::framework::ConcreteDataMatcher("GLO", "GRPECS", 0)) {
103 // check if received object is valid
104 if (o2::base::GRPGeomHelper::instance().getGRPECS()->getRun() != 0) {
105 LOGP(debug, "Updating GRPECS");
106 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(), true);
107 } else {
108 LOGP(debug, "Detected default GRPECS object");
109 }
110 }
111 }
112
114 {
115 // capture orbit-reset info once for precise CCDB timestamp calculation
116 if (mCCDBRequest->askTime) {
117 const bool grpecsValid = pc.inputs().isValid("grpecs");
118 const bool orbitResetValid = pc.inputs().isValid("orbitReset");
119 if (grpecsValid) {
120 pc.inputs().get<o2::parameters::GRPECSObject*>("grpecs");
121 }
122 if (orbitResetValid) {
123 pc.inputs().get<std::vector<Long64_t>*>("orbitReset");
124 }
125 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
126 return;
127 }
128 }
129
130 const auto tf = processing_helpers::getCurrentTF(pc);
131 if (tf == std::numeric_limits<uint32_t>::max()) {
132 forwardEOSData(pc);
133 return;
134 }
135
136 // automatically detect firstTF in case firstTF was not specified
137 if (mTFStart.front() <= -1) {
138 const auto firstTFDetected = tf;
139 const long offsetTF = std::abs(mTFStart.front() + 1);
140 const auto nTotTFs = getNRealTFs();
141 // tf is the batch TF counter (= last real TF in the first batch), subtract (mNTFsBuffer - 1) to recover the actual first real TF of the interval
142 const long firstRealTF = static_cast<long>(firstTFDetected) - (mNTFsBuffer - 1) + offsetTF;
143 mTFStart = {firstRealTF, firstRealTF + nTotTFs};
144 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
145 LOGP(detail, "Setting {} as first TF", mTFStart[0]);
146 LOGP(detail, "Using offset of {} TFs for setting the first TF", offsetTF);
147 }
148
149 // check which buffer to use for current incoming data
150 const bool currentBuffer = (tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
151 if (mTFStart[currentBuffer] > tf) {
152 LOGP(detail, "All CRUs for current TF {} already received. Skipping this TF", tf);
153 return;
154 }
155
156 const unsigned int currentOutLane = getOutLane(tf);
157 const unsigned int relTF = (tf - mTFStart[currentBuffer]) / mNTFsBuffer;
158 LOGP(debug, "Current TF: {}, relative TF: {}, current buffer: {}, current output lane: {}, mTFStart: {}", tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]);
159
160 if (relTF >= mProcessedCRU[currentBuffer].size()) {
161 LOGP(warning, "Skipping tf {}: relative tf {} is larger than size of buffer: {}", tf, relTF, mProcessedCRU[currentBuffer].size());
162 // check number of processed CRUs for previous TFs. If CRUs are missing for them, they are probably lost/not received
163 mProcessedTotalData = mCheckEveryNData;
164 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf);
165 return;
166 }
167
168 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
169 return;
170 }
171
172 if (mSendOutputStartInfo[currentBuffer]) {
173 mSendOutputStartInfo[currentBuffer] = false;
174 pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[currentBuffer]);
175 }
176
177 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
178 mSendCCDBOutputOrbitReset[currentOutLane] = false;
179 mSendCCDBOutputGRPECS[currentOutLane] = false;
181 }
182
183 forwardOrbitInfo(pc, currentBuffer, relTF, currentOutLane);
184
185 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
186 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
187 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
188
189 // check if cru is specified in input cru list
190 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
191 LOGP(debug, "Received data from CRU: {} which was not specified as input. Skipping", cru);
192 continue;
193 }
194
195 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
196 continue;
197 }
198 // count total number of processed CRUs for given TF
199 ++mProcessedCRU[currentBuffer][relTF];
200 // to keep track of processed CRUs
201 mProcessedCRUs[currentBuffer][relTF][cru] = true;
202
203 sendOutput(pc, currentOutLane, cru, pc.inputs().get<o2::pmr::vector<uint16_t>>(ref));
204 }
205
206 LOGP(detail, "Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(), tf);
207
208 // check for missing data if specified
209 if (mNTFsDataDrop > 0) {
210 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane, tf);
211 }
212
213 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
214 ++mProcessedTFs[currentBuffer];
215 }
216
217 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
218 finishInterval(pc, currentOutLane, currentBuffer, tf);
219 }
220 }
221
223
225 static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
226 {
227 const std::string name = fmt::format("CMVAGG{}", lane);
229 description.runtimeInit(name.substr(0, 16).c_str());
230 return description;
231 }
232
235 {
236 const std::string name = fmt::format("CMVORB{}", lane);
238 description.runtimeInit(name.substr(0, 16).c_str());
239 return description;
240 }
241
244
245 private:
246 std::vector<uint32_t> mCRUs{};
247 const unsigned int mTimeFrames{};
248 const int mNTFsBuffer{1};
249 const unsigned int mOutLanes{};
250 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
251 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
252 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
253 std::array<long, 2> mTFStart{};
254 std::array<long, 2> mTFEnd{};
255 std::array<bool, 2> mSendOutputStartInfo{true, true};
256 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
257 std::vector<bool> mSendCCDBOutputOrbitReset{};
258 std::vector<bool> mSendCCDBOutputGRPECS{};
259 unsigned int mCurrentOutLane{0};
260 bool mBuffer{false};
261 int mNFactorTFs{0};
262 int mNTFsDataDrop{0};
263 std::array<int, 2> mStartNTFsDataDrop{0};
264 long mProcessedTotalData{0};
265 int mCheckEveryNData{1};
266 std::vector<o2::framework::InputSpec> mFilter{};
267 std::vector<o2::framework::InputSpec> mOrbitFilter{};
268 std::vector<header::DataDescription> mDataDescrOut{};
269 std::vector<header::DataDescription> mOrbitDescrOut{};
270 std::array<std::vector<bool>, 2> mOrbitInfoForwarded{};
271
273 unsigned int getOutLane(const uint32_t tf) const { return (tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; }
275 unsigned int getNRealTFs() const { return mNTFsBuffer * mTimeFrames; }
276
277 void sendOutput(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const unsigned int cru, o2::pmr::vector<uint16_t> cmvs)
278 {
280 }
281
282 void sendOrbitInfo(o2::framework::ProcessingContext& pc, const unsigned int outLane, const uint64_t orbitInfo)
283 {
285 }
286
287 void forwardOrbitInfo(o2::framework::ProcessingContext& pc, const bool currentBuffer, const unsigned int relTF, const unsigned int currentOutLane)
288 {
289 if (mOrbitInfoForwarded[currentBuffer][relTF]) {
290 return;
291 }
292
293 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
294 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
295 const unsigned int cru = hdr->subSpecification >> 7;
296 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
297 continue;
298 }
299
300 sendOrbitInfo(pc, currentOutLane, pc.inputs().get<uint64_t>(ref));
301 mOrbitInfoForwarded[currentBuffer][relTF] = true;
302 break;
303 }
304 }
305
306 void forwardEOSData(o2::framework::ProcessingContext& pc)
307 {
308 const unsigned int currentOutLane = mCurrentOutLane;
309
310 if (mSendOutputStartInfo[mBuffer] && (mTFStart[mBuffer] >= 0)) {
311 mSendOutputStartInfo[mBuffer] = false;
312 pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{currentOutLane}}, mTFStart[mBuffer]);
313 }
314
315 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
316 mSendCCDBOutputOrbitReset[currentOutLane] = false;
317 mSendCCDBOutputGRPECS[currentOutLane] = false;
319 }
320
321 if (!mOrbitInfoForwarded[mBuffer].empty()) {
322 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
323 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
324 const unsigned int cru = hdr->subSpecification >> 7;
325 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
326 continue;
327 }
328 sendOrbitInfo(pc, currentOutLane, pc.inputs().get<uint64_t>(ref));
329 break;
330 }
331 }
332
333 for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
334 auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
335 const unsigned int cru = hdr->subSpecification >> 7;
336 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
337 continue;
338 }
339 sendOutput(pc, currentOutLane, cru, pc.inputs().get<o2::pmr::vector<uint16_t>>(ref));
340 }
341 }
342
343 void clearBuffer(const bool currentBuffer)
344 {
345 // reset per-CRU received flags so the next interval can accept data from all CRUs again
346 for (auto& crusMap : mProcessedCRUs[currentBuffer]) {
347 for (auto& it : crusMap) {
348 it.second = false;
349 }
350 }
351
352 mProcessedTFs[currentBuffer] = 0;
353 std::fill(mProcessedCRU[currentBuffer].begin(), mProcessedCRU[currentBuffer].end(), 0);
354 std::fill(mOrbitInfoForwarded[currentBuffer].begin(), mOrbitInfoForwarded[currentBuffer].end(), false);
355
356 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
357 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
358
359 // switch buffer and advance output lane
360 mBuffer = !mBuffer;
361 mCurrentOutLane = ++mCurrentOutLane % mOutLanes;
362 }
363
364 void checkIntervalsForMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const long relTF, const unsigned int currentOutLane, const uint32_t tf)
365 {
366 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
367 LOGP(detail, "Checking for dropped packages...");
368
369 // if the last buffer has a smaller time range than expected, flush its remaining uncompleted TFs
370 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
371 LOGP(warning, "Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size());
372 const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1);
373 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].size(), lastLane);
374 LOGP(detail, "All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer", tf);
375 finishInterval(pc, lastLane, !currentBuffer, tf);
376 }
377
378 const int tfEndCheck = std::clamp(static_cast<int>(relTF) - mNTFsDataDrop, 0, static_cast<int>(mProcessedCRU[currentBuffer].size()));
379 LOGP(detail, "Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
380 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane);
381 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
382 }
383 }
384
385 void checkMissingData(o2::framework::ProcessingContext& pc, const bool currentBuffer, const int startTF, const int endTF, const unsigned int outLane)
386 {
387 for (int iTF = startTF; iTF < endTF; ++iTF) {
388 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
389 LOGP(warning, "CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] + static_cast<long>(iTF) * mNTFsBuffer, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
390 ++mProcessedTFs[currentBuffer];
391 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
392
393 // send empty payloads for missing CRUs so the aggregate lane sees a complete set
394 for (auto& it : mProcessedCRUs[currentBuffer][iTF]) {
395 if (!it.second) {
396 it.second = true;
397 sendOutput(pc, outLane, it.first, o2::pmr::vector<uint16_t>());
398 }
399 }
400
401 // send zero orbit placeholder for missing TF so the aggregate lane can still reconstruct timing
402 if (!mOrbitInfoForwarded[currentBuffer][iTF]) {
403 sendOrbitInfo(pc, outLane, 0);
404 mOrbitInfoForwarded[currentBuffer][iTF] = true;
405 }
406 }
407 }
408 }
409
410 void finishInterval(o2::framework::ProcessingContext& pc, const unsigned int currentOutLane, const bool buffer, const uint32_t tf)
411 {
412 if (mNFactorTFs > 0) {
413 mNFactorTFs = 0;
414 // ToDo: Find better fix. Set oldestForChannel to a very large value so the DPL dispatcher does not block waiting for older TF data that will never arrive
415 for (unsigned int ilane = 0; ilane < mOutLanes; ++ilane) {
416 auto& deviceProxy = pc.services().get<o2::framework::FairMQDeviceProxy>();
417 auto& state = deviceProxy.getOutputChannelState({static_cast<int>(ilane)});
418 size_t oldest = std::numeric_limits<size_t>::max() - 1;
419 state.oldestForChannel = {oldest};
420 }
421 }
422
423 LOGP(detail, "All TFs {} for current buffer received. Clearing buffer", tf);
424 clearBuffer(buffer);
425 mStartNTFsDataDrop[buffer] = 0;
426 mSendOutputStartInfo[buffer] = true;
427 }
428};
429
430o2::framework::DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp = false, const int nTFsBuffer = 1)
431{
432 std::vector<o2::framework::InputSpec> inputSpecs;
433 inputSpecs.emplace_back(o2::framework::InputSpec{"cmvsgroup", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup()}, o2::framework::Lifetime::Sporadic});
434 inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbit", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo()}, o2::framework::Lifetime::Sporadic});
435
436 std::vector<o2::framework::OutputSpec> outputSpecs;
437 outputSpecs.reserve(3 * outlanes);
438 for (unsigned int lane = 0; lane < outlanes; ++lane) {
439 outputSpecs.emplace_back(o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMV(lane)}, o2::framework::Lifetime::Sporadic);
440 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitInfo(lane), header::DataHeader::SubSpecificationType{lane}}, o2::framework::Lifetime::Sporadic);
441 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVFirstTF(), header::DataHeader::SubSpecificationType{lane}}, o2::framework::Lifetime::Sporadic);
442 }
443
444 // Only lane 0 fetches CCDB orbit-reset/GRPECS objects and broadcasts them to all aggregate lanes, the other distribute lanes do not need them, avoiding redundant CCDB requests
445 bool fetchCCDB = false;
446 if (sendPrecisetimeStamp && (ilane == 0)) {
447 fetchCCDB = true;
448 for (unsigned int lane = 0; lane < outlanes; ++lane) {
449 outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCDistributeCMVSpec::getDataDescriptionCMVOrbitReset(), header::DataHeader::SubSpecificationType{lane}}, o2::framework::Lifetime::Sporadic);
450 }
451 }
452
453 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB, // orbitResetTime
454 fetchCCDB, // GRPECS=true
455 false, // GRPLHCIF
456 false, // GRPMagField
457 false, // askMatLUT
459 inputSpecs);
460
461 const auto id = fmt::format("tpc-distribute-cmv-{:02}", ilane);
463 id.data(),
464 inputSpecs,
465 outputSpecs,
466 o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCDistributeCMVSpec>(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)},
467 o2::framework::Options{{"drop-data-after-nTFs", o2::framework::VariantType::Int, 0, {"Number of TFs after which to drop the data."}},
468 {"check-data-every-n", o2::framework::VariantType::Int, 0, {"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}},
469 {"nFactorTFs", o2::framework::VariantType::Int, 1000, {"Number of TFs to skip for sending oldest TF."}}}};
470 spec.rank = ilane;
471 return spec;
472}
473
474} // namespace o2::tpc
475
476#endif
header::DataDescription description
benchmark::State & state
std::ostringstream debug
int32_t i
TPCZSHDR * hdr
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
TPC device for processing CMVs on FLPs.
auto getOrbitResetTimeMS() const
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void snapshot(const Output &spec, T const &object)
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
static constexpr header::DataDescription getDataDescriptionCMVFirstTF()
void init(o2::framework::InitContext &ic) final
static header::DataDescription getDataDescriptionCMV(const unsigned int lane)
Return data description for aggregated CMVs for a given lane.
static constexpr header::DataDescription getDataDescriptionCMVOrbitReset()
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
TPCDistributeCMVSpec(const std::vector< uint32_t > &crus, const unsigned int timeframes, const int nTFsBuffer, const unsigned int outlanes, const int firstTF, std::shared_ptr< o2::base::GRPGeomRequest > req)
static header::DataDescription getDataDescriptionCMVOrbitInfo(const unsigned int lane)
Return data description for orbit/BC info for a given output lane.
void run(o2::framework::ProcessingContext &pc) final
static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo()
Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP.
static constexpr header::DataDescription getDataDescriptionCMVGroup()
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
@ Me
Only quit this data processor.
std::vector< ConfigParamSpec > Options
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
uint32_t getCurrentTF(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:168
Enum< T >::Iterator begin(Enum< T >)
Definition Defs.h:156
o2::framework::DataProcessorSpec getTPCDistributeCMVSpec(const int ilane, const std::vector< uint32_t > &crus, const unsigned int timeframes, const unsigned int outlanes, const int firstTF, const bool sendPrecisetimeStamp=false, const int nTFsBuffer=1)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
void empty(int)
std::unique_ptr< GPUReconstructionTimeframe > tf
uint32_t SubSpecificationType
Definition DataHeader.h:622
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261