45 TPCDistributeCMVSpec(
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const int nTFsBuffer,
const unsigned int outlanes,
const int firstTF, std::shared_ptr<o2::base::GRPGeomRequest> req)
47 mTimeFrames{timeframes},
48 mNTFsBuffer{nTFsBuffer},
50 mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}},
51 mTFStart{{firstTF, firstTF +
static_cast<long>(timeframes) * nTFsBuffer}},
52 mTFEnd{{firstTF +
static_cast<long>(timeframes) * nTFsBuffer - 1, firstTF + 2LL * timeframes * nTFsBuffer - 1}},
54 mSendCCDBOutputOrbitReset(outlanes),
55 mSendCCDBOutputGRPECS(outlanes),
56 mOrbitInfoForwarded{{std::vector<bool>(timeframes,
false), std::vector<bool>(timeframes,
false)}}
58 mDataDescrOut.reserve(mOutLanes);
59 mOrbitDescrOut.reserve(mOutLanes);
60 for (
unsigned int i = 0;
i < mOutLanes; ++
i) {
61 mDataDescrOut.emplace_back(getDataDescriptionCMV(
i));
62 mOrbitDescrOut.emplace_back(getDataDescriptionCMVOrbitInfo(
i));
65 std::sort(mCRUs.begin(), mCRUs.end());
67 for (
auto& processedCRUbuffer : mProcessedCRUs) {
68 processedCRUbuffer.resize(mTimeFrames);
69 for (
auto& crusMap : processedCRUbuffer) {
70 crusMap.reserve(mCRUs.size());
71 for (
const auto cruID : mCRUs) {
72 crusMap.emplace(cruID,
false);
84 mNFactorTFs = ic.options().get<
int>(
"nFactorTFs");
85 mNTFsDataDrop = ic.options().get<
int>(
"drop-data-after-nTFs");
86 mCheckEveryNData = ic.options().get<
int>(
"check-data-every-n");
87 if (mCheckEveryNData == 0) {
88 mCheckEveryNData = mTimeFrames / 2;
89 if (mCheckEveryNData == 0) {
92 mNTFsDataDrop = mCheckEveryNData;
100 LOGP(
debug,
"Updating ORBITRESET");
101 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(),
true);
105 LOGP(
debug,
"Updating GRPECS");
106 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(),
true);
108 LOGP(
debug,
"Detected default GRPECS object");
116 if (mCCDBRequest->askTime) {
117 const bool grpecsValid = pc.inputs().isValid(
"grpecs");
118 const bool orbitResetValid = pc.inputs().isValid(
"orbitReset");
122 if (orbitResetValid) {
123 pc.inputs().get<std::vector<Long64_t>*>(
"orbitReset");
125 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
131 if (
tf == std::numeric_limits<uint32_t>::max()) {
137 if (mTFStart.front() <= -1) {
138 const auto firstTFDetected =
tf;
139 const long offsetTF = std::abs(mTFStart.front() + 1);
140 const auto nTotTFs = getNRealTFs();
142 const long firstRealTF =
static_cast<long>(firstTFDetected) - (mNTFsBuffer - 1) + offsetTF;
143 mTFStart = {firstRealTF, firstRealTF + nTotTFs};
144 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
145 LOGP(detail,
"Setting {} as first TF", mTFStart[0]);
146 LOGP(detail,
"Using offset of {} TFs for setting the first TF", offsetTF);
150 const bool currentBuffer = (
tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
151 if (mTFStart[currentBuffer] >
tf) {
152 LOGP(detail,
"All CRUs for current TF {} already received. Skipping this TF",
tf);
156 const unsigned int currentOutLane = getOutLane(
tf);
157 const unsigned int relTF = (
tf - mTFStart[currentBuffer]) / mNTFsBuffer;
158 LOGP(
debug,
"Current TF: {}, relative TF: {}, current buffer: {}, current output lane: {}, mTFStart: {}",
tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]);
160 if (relTF >= mProcessedCRU[currentBuffer].
size()) {
161 LOGP(warning,
"Skipping tf {}: relative tf {} is larger than size of buffer: {}",
tf, relTF, mProcessedCRU[currentBuffer].
size());
163 mProcessedTotalData = mCheckEveryNData;
164 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane,
tf);
168 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
172 if (mSendOutputStartInfo[currentBuffer]) {
173 mSendOutputStartInfo[currentBuffer] =
false;
177 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
178 mSendCCDBOutputOrbitReset[currentOutLane] =
false;
179 mSendCCDBOutputGRPECS[currentOutLane] =
false;
183 forwardOrbitInfo(pc, currentBuffer, relTF, currentOutLane);
186 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
187 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
190 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
191 LOGP(
debug,
"Received data from CRU: {} which was not specified as input. Skipping", cru);
195 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
199 ++mProcessedCRU[currentBuffer][relTF];
201 mProcessedCRUs[currentBuffer][relTF][cru] =
true;
206 LOGP(detail,
"Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(),
tf);
209 if (mNTFsDataDrop > 0) {
210 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane,
tf);
213 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
214 ++mProcessedTFs[currentBuffer];
217 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
218 finishInterval(pc, currentOutLane, currentBuffer,
tf);
227 const std::string
name = fmt::format(
"CMVAGG{}", lane);
236 const std::string
name = fmt::format(
"CMVORB{}", lane);
246 std::vector<uint32_t> mCRUs{};
247 const unsigned int mTimeFrames{};
248 const int mNTFsBuffer{1};
249 const unsigned int mOutLanes{};
250 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
251 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
252 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
253 std::array<long, 2> mTFStart{};
254 std::array<long, 2> mTFEnd{};
255 std::array<bool, 2> mSendOutputStartInfo{
true,
true};
256 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
257 std::vector<bool> mSendCCDBOutputOrbitReset{};
258 std::vector<bool> mSendCCDBOutputGRPECS{};
259 unsigned int mCurrentOutLane{0};
262 int mNTFsDataDrop{0};
263 std::array<int, 2> mStartNTFsDataDrop{0};
264 long mProcessedTotalData{0};
265 int mCheckEveryNData{1};
266 std::vector<o2::framework::InputSpec> mFilter{};
267 std::vector<o2::framework::InputSpec> mOrbitFilter{};
268 std::vector<header::DataDescription> mDataDescrOut{};
269 std::vector<header::DataDescription> mOrbitDescrOut{};
270 std::array<std::vector<bool>, 2> mOrbitInfoForwarded{};
273 unsigned int getOutLane(
const uint32_t
tf)
const {
return (
tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; }
275 unsigned int getNRealTFs()
const {
return mNTFsBuffer * mTimeFrames; }
289 if (mOrbitInfoForwarded[currentBuffer][relTF]) {
293 for (
auto&
ref :
o2::
framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
294 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
295 const unsigned int cru =
hdr->subSpecification >> 7;
296 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
300 sendOrbitInfo(pc, currentOutLane, pc.
inputs().
get<uint64_t>(
ref));
301 mOrbitInfoForwarded[currentBuffer][relTF] =
true;
308 const unsigned int currentOutLane = mCurrentOutLane;
310 if (mSendOutputStartInfo[mBuffer] && (mTFStart[mBuffer] >= 0)) {
311 mSendOutputStartInfo[mBuffer] =
false;
315 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
316 mSendCCDBOutputOrbitReset[currentOutLane] =
false;
317 mSendCCDBOutputGRPECS[currentOutLane] =
false;
321 if (!mOrbitInfoForwarded[mBuffer].
empty()) {
322 for (
auto&
ref :
o2::
framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
323 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
324 const unsigned int cru =
hdr->subSpecification >> 7;
325 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
328 sendOrbitInfo(pc, currentOutLane, pc.
inputs().
get<uint64_t>(
ref));
333 for (
auto&
ref :
o2::
framework::InputRecordWalker(pc.inputs(), mFilter)) {
334 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
335 const unsigned int cru =
hdr->subSpecification >> 7;
336 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
343 void clearBuffer(
const bool currentBuffer)
346 for (
auto& crusMap : mProcessedCRUs[currentBuffer]) {
347 for (
auto& it : crusMap) {
352 mProcessedTFs[currentBuffer] = 0;
353 std::fill(mProcessedCRU[currentBuffer].
begin(), mProcessedCRU[currentBuffer].
end(), 0);
354 std::fill(mOrbitInfoForwarded[currentBuffer].
begin(), mOrbitInfoForwarded[currentBuffer].
end(),
false);
356 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
357 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
361 mCurrentOutLane = ++mCurrentOutLane % mOutLanes;
364 void checkIntervalsForMissingData(
o2::framework::ProcessingContext& pc,
const bool currentBuffer,
const long relTF,
const unsigned int currentOutLane,
const uint32_t
tf)
366 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
367 LOGP(detail,
"Checking for dropped packages...");
370 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
371 LOGP(warning,
"Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size());
372 const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1);
373 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size(), lastLane);
374 LOGP(detail,
"All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer",
tf);
375 finishInterval(pc, lastLane, !currentBuffer,
tf);
378 const int tfEndCheck = std::clamp(
static_cast<int>(relTF) - mNTFsDataDrop, 0,
static_cast<int>(mProcessedCRU[currentBuffer].
size()));
379 LOGP(detail,
"Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
380 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane);
381 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
387 for (
int iTF = startTF; iTF < endTF; ++iTF) {
388 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
389 LOGP(warning,
"CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] +
static_cast<long>(iTF) * mNTFsBuffer, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
390 ++mProcessedTFs[currentBuffer];
391 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
394 for (
auto& it : mProcessedCRUs[currentBuffer][iTF]) {
402 if (!mOrbitInfoForwarded[currentBuffer][iTF]) {
403 sendOrbitInfo(pc, outLane, 0);
404 mOrbitInfoForwarded[currentBuffer][iTF] =
true;
412 if (mNFactorTFs > 0) {
415 for (
unsigned int ilane = 0; ilane < mOutLanes; ++ilane) {
417 auto&
state = deviceProxy.getOutputChannelState({
static_cast<int>(ilane)});
418 size_t oldest = std::numeric_limits<size_t>::max() - 1;
419 state.oldestForChannel = {oldest};
423 LOGP(detail,
"All TFs {} for current buffer received. Clearing buffer",
tf);
425 mStartNTFsDataDrop[
buffer] = 0;
426 mSendOutputStartInfo[
buffer] =
true;
432 std::vector<o2::framework::InputSpec> inputSpecs;
436 std::vector<o2::framework::OutputSpec> outputSpecs;
437 outputSpecs.reserve(3 * outlanes);
438 for (
unsigned int lane = 0; lane < outlanes; ++lane) {
445 bool fetchCCDB =
false;
446 if (sendPrecisetimeStamp && (ilane == 0)) {
448 for (
unsigned int lane = 0; lane < outlanes; ++lane) {
453 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB,
461 const auto id = fmt::format(
"tpc-distribute-cmv-{:02}", ilane);
466 o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCDistributeCMVSpec>(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)},
468 {
"check-data-every-n",
o2::framework::VariantType::Int, 0, {
"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}},