49 TPCDistributeCMVSpec(
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const int nTFsBuffer,
const unsigned int outlanes,
const int firstTF, std::shared_ptr<o2::base::GRPGeomRequest> req)
51 mTimeFrames{timeframes},
52 mNTFsBuffer{nTFsBuffer},
54 mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}},
55 mTFStart{{firstTF, firstTF +
static_cast<long>(timeframes) * nTFsBuffer}},
56 mTFEnd{{firstTF +
static_cast<long>(timeframes) * nTFsBuffer - 1, firstTF + 2LL * timeframes * nTFsBuffer - 1}},
58 mSendCCDBOutputOrbitReset(outlanes),
59 mSendCCDBOutputGRPECS(outlanes),
60 mOrbitInfoForwarded{{std::vector<bool>(timeframes,
false), std::vector<bool>(timeframes,
false)}}
62 mDataDescrOut.reserve(mOutLanes);
63 mOrbitDescrOut.reserve(mOutLanes);
64 for (
unsigned int i = 0;
i < mOutLanes; ++
i) {
65 mDataDescrOut.emplace_back(getDataDescriptionCMV(
i));
66 mOrbitDescrOut.emplace_back(getDataDescriptionCMVOrbitInfo(
i));
69 std::sort(mCRUs.begin(), mCRUs.end());
71 for (
auto& processedCRUbuffer : mProcessedCRUs) {
72 processedCRUbuffer.resize(mTimeFrames);
73 for (
auto& crusMap : processedCRUbuffer) {
74 crusMap.reserve(mCRUs.size());
75 for (
const auto cruID : mCRUs) {
76 crusMap.emplace(cruID,
false);
88 mNFactorTFs = ic.options().get<
int>(
"nFactorTFs");
89 mNTFsDataDrop = ic.options().get<
int>(
"drop-data-after-nTFs");
90 mCheckEveryNData = ic.options().get<
int>(
"check-data-every-n");
91 if (mCheckEveryNData == 0) {
92 mCheckEveryNData = mTimeFrames / 2;
93 if (mCheckEveryNData == 0) {
96 mNTFsDataDrop = mCheckEveryNData;
104 LOGP(
debug,
"Updating ORBITRESET");
105 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(),
true);
109 LOGP(
debug,
"Updating GRPECS");
110 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(),
true);
112 LOGP(
debug,
"Detected default GRPECS object");
120 if (mCCDBRequest->askTime) {
121 const bool grpecsValid = pc.inputs().isValid(
"grpecs");
122 const bool orbitResetValid = pc.inputs().isValid(
"orbitReset");
126 if (orbitResetValid) {
127 pc.inputs().get<std::vector<Long64_t>*>(
"orbitReset");
129 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
135 if (
tf == std::numeric_limits<uint32_t>::max()) {
141 if (mTFStart.front() <= -1) {
142 const auto firstTFDetected =
tf;
143 const long offsetTF = std::abs(mTFStart.front() + 1);
144 const auto nTotTFs = getNRealTFs();
146 const long firstRealTF =
static_cast<long>(firstTFDetected) - (mNTFsBuffer - 1) + offsetTF;
147 mTFStart = {firstRealTF, firstRealTF + nTotTFs};
148 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
149 LOGP(detail,
"Setting {} as first TF", mTFStart[0]);
150 LOGP(detail,
"Using offset of {} TFs for setting the first TF", offsetTF);
154 const bool currentBuffer = (
tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
155 if (mTFStart[currentBuffer] >
tf) {
156 LOGP(detail,
"All CRUs for current TF {} already received. Skipping this TF",
tf);
160 const unsigned int currentOutLane = getOutLane(
tf);
161 const unsigned int relTF = (
tf - mTFStart[currentBuffer]) / mNTFsBuffer;
162 LOGP(
debug,
"Current TF: {}, relative TF: {}, current buffer: {}, current output lane: {}, mTFStart: {}",
tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]);
164 if (relTF >= mProcessedCRU[currentBuffer].
size()) {
165 LOGP(warning,
"Skipping tf {}: relative tf {} is larger than size of buffer: {}",
tf, relTF, mProcessedCRU[currentBuffer].
size());
167 mProcessedTotalData = mCheckEveryNData;
168 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane,
tf);
172 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
176 if (mSendOutputStartInfo[currentBuffer]) {
177 mSendOutputStartInfo[currentBuffer] =
false;
181 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
182 mSendCCDBOutputOrbitReset[currentOutLane] =
false;
183 mSendCCDBOutputGRPECS[currentOutLane] =
false;
187 forwardOrbitInfo(pc, currentBuffer, relTF, currentOutLane);
190 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
191 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
194 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
195 LOGP(
debug,
"Received data from CRU: {} which was not specified as input. Skipping", cru);
199 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
203 ++mProcessedCRU[currentBuffer][relTF];
205 mProcessedCRUs[currentBuffer][relTF][cru] =
true;
210 LOGP(detail,
"Number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(),
tf);
213 if (mNTFsDataDrop > 0) {
214 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane,
tf);
217 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
218 ++mProcessedTFs[currentBuffer];
221 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
222 finishInterval(pc, currentOutLane, currentBuffer,
tf);
231 const std::string
name = fmt::format(
"CMVAGG{}", lane);
240 const std::string
name = fmt::format(
"CMVORB{}", lane);
250 std::vector<uint32_t> mCRUs{};
251 const unsigned int mTimeFrames{};
252 const int mNTFsBuffer{1};
253 const unsigned int mOutLanes{};
254 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
255 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
256 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
257 std::array<long, 2> mTFStart{};
258 std::array<long, 2> mTFEnd{};
259 std::array<bool, 2> mSendOutputStartInfo{
true,
true};
260 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
261 std::vector<bool> mSendCCDBOutputOrbitReset{};
262 std::vector<bool> mSendCCDBOutputGRPECS{};
263 unsigned int mCurrentOutLane{0};
266 int mNTFsDataDrop{0};
267 std::array<int, 2> mStartNTFsDataDrop{0};
268 long mProcessedTotalData{0};
269 int mCheckEveryNData{1};
270 std::vector<InputSpec> mFilter{};
271 std::vector<InputSpec> mOrbitFilter{};
272 std::vector<header::DataDescription> mDataDescrOut{};
273 std::vector<header::DataDescription> mOrbitDescrOut{};
274 std::array<std::vector<bool>, 2> mOrbitInfoForwarded{};
277 unsigned int getOutLane(
const uint32_t
tf)
const {
return (
tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; }
279 unsigned int getNRealTFs()
const {
return mNTFsBuffer * mTimeFrames; }
293 if (mOrbitInfoForwarded[currentBuffer][relTF]) {
298 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
299 const unsigned int cru =
hdr->subSpecification >> 7;
300 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
304 sendOrbitInfo(pc, currentOutLane, pc.
inputs().
get<uint64_t>(
ref));
305 mOrbitInfoForwarded[currentBuffer][relTF] =
true;
312 const unsigned int currentOutLane = mCurrentOutLane;
314 if (mSendOutputStartInfo[mBuffer] && (mTFStart[mBuffer] >= 0)) {
315 mSendOutputStartInfo[mBuffer] =
false;
319 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
320 mSendCCDBOutputOrbitReset[currentOutLane] =
false;
321 mSendCCDBOutputGRPECS[currentOutLane] =
false;
325 if (!mOrbitInfoForwarded[mBuffer].
empty()) {
327 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
328 const unsigned int cru =
hdr->subSpecification >> 7;
329 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
332 sendOrbitInfo(pc, currentOutLane, pc.
inputs().
get<uint64_t>(
ref));
338 auto const*
hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
339 const unsigned int cru =
hdr->subSpecification >> 7;
340 if (!std::binary_search(mCRUs.begin(), mCRUs.end(), cru)) {
347 void clearBuffer(
const bool currentBuffer)
350 for (
auto& crusMap : mProcessedCRUs[currentBuffer]) {
351 for (
auto& it : crusMap) {
356 mProcessedTFs[currentBuffer] = 0;
357 std::fill(mProcessedCRU[currentBuffer].
begin(), mProcessedCRU[currentBuffer].
end(), 0);
358 std::fill(mOrbitInfoForwarded[currentBuffer].
begin(), mOrbitInfoForwarded[currentBuffer].
end(),
false);
360 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
361 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
365 mCurrentOutLane = ++mCurrentOutLane % mOutLanes;
368 void checkIntervalsForMissingData(
o2::framework::ProcessingContext& pc,
const bool currentBuffer,
const long relTF,
const unsigned int currentOutLane,
const uint32_t
tf)
370 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
371 LOGP(detail,
"Checking for dropped packages...");
374 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
375 LOGP(warning,
"Checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size());
376 const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1);
377 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size(), lastLane);
378 LOGP(detail,
"All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer",
tf);
379 finishInterval(pc, lastLane, !currentBuffer,
tf);
382 const int tfEndCheck = std::clamp(
static_cast<int>(relTF) - mNTFsDataDrop, 0,
static_cast<int>(mProcessedCRU[currentBuffer].
size()));
383 LOGP(detail,
"Checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
384 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane);
385 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
391 for (
int iTF = startTF; iTF < endTF; ++iTF) {
392 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
393 LOGP(warning,
"CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] +
static_cast<long>(iTF) * mNTFsBuffer, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
394 ++mProcessedTFs[currentBuffer];
395 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
398 for (
auto& it : mProcessedCRUs[currentBuffer][iTF]) {
406 if (!mOrbitInfoForwarded[currentBuffer][iTF]) {
407 sendOrbitInfo(pc, outLane, 0);
408 mOrbitInfoForwarded[currentBuffer][iTF] =
true;
416 if (mNFactorTFs > 0) {
419 for (
unsigned int ilane = 0; ilane < mOutLanes; ++ilane) {
421 auto&
state = deviceProxy.getOutputChannelState({
static_cast<int>(ilane)});
422 size_t oldest = std::numeric_limits<size_t>::max() - 1;
423 state.oldestForChannel = {oldest};
427 LOGP(detail,
"All TFs {} for current buffer received. Clearing buffer",
tf);
429 mStartNTFsDataDrop[
buffer] = 0;
430 mSendOutputStartInfo[
buffer] =
true;
434DataProcessorSpec getTPCDistributeCMVSpec(
const int ilane,
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const unsigned int outlanes,
const int firstTF,
const bool sendPrecisetimeStamp =
false,
const int nTFsBuffer = 1)
436 std::vector<InputSpec> inputSpecs;
440 std::vector<OutputSpec> outputSpecs;
441 outputSpecs.reserve(3 * outlanes);
442 for (
unsigned int lane = 0; lane < outlanes; ++lane) {
449 bool fetchCCDB =
false;
450 if (sendPrecisetimeStamp && (ilane == 0)) {
452 for (
unsigned int lane = 0; lane < outlanes; ++lane) {
457 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB,
465 const auto id = fmt::format(
"tpc-distribute-cmv-{:02}", ilane);
470 AlgorithmSpec{adaptFromTask<TPCDistributeCMVSpec>(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)},
471 Options{{
"drop-data-after-nTFs", VariantType::Int, 0, {
"Number of TFs after which to drop the data."}},
472 {
"check-data-every-n", VariantType::Int, 0, {
"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}},
473 {
"nFactorTFs", VariantType::Int, 1000, {
"Number of TFs to skip for sending oldest TF."}}}};