45 TPCDistributeIDCSpec(
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const int nTFsBuffer,
const unsigned int outlanes,
const int firstTF, std::shared_ptr<o2::base::GRPGeomRequest> req)
46 : mCRUs{crus}, mTimeFrames{timeframes}, mNTFsBuffer{nTFsBuffer}, mOutLanes{outlanes}, mProcessedCRU{{std::vector<unsigned int>(timeframes), std::vector<unsigned int>(timeframes)}}, mTFStart{{firstTF, firstTF + timeframes}}, mTFEnd{{firstTF + timeframes - 1, mTFStart[1] + timeframes - 1}}, mCCDBRequest(req), mSendCCDBOutputOrbitReset(outlanes), mSendCCDBOutputGRPECS(outlanes)
49 mDataDescrOut.reserve(mOutLanes);
50 for (
unsigned int i = 0;
i < mOutLanes; ++
i) {
51 mDataDescrOut.emplace_back(getDataDescriptionIDC(
i));
55 std::sort(mCRUs.begin(), mCRUs.end());
57 for (
auto& processedCRUbuffer : mProcessedCRUs) {
58 processedCRUbuffer.resize(mTimeFrames);
59 for (
auto& crusMap : processedCRUbuffer) {
60 crusMap.reserve(mCRUs.size());
61 for (
const auto cruID : mCRUs) {
62 crusMap.emplace(cruID,
false);
68 for (
auto side : sides) {
69 const std::string
name = (
side ==
Side::A) ?
"idcsgroupa" :
"idcsgroupc";
77 mNFactorTFs = ic.options().get<
int>(
"nFactorTFs");
78 mNTFsDataDrop = ic.options().get<
int>(
"drop-data-after-nTFs");
79 mCheckEveryNData = ic.options().get<
int>(
"check-data-every-n");
80 if (mCheckEveryNData == 0) {
81 mCheckEveryNData = mTimeFrames / 2;
82 if (mCheckEveryNData == 0) {
85 mNTFsDataDrop = mCheckEveryNData;
93 LOGP(info,
"Updating ORBITRESET");
94 std::fill(mSendCCDBOutputOrbitReset.begin(), mSendCCDBOutputOrbitReset.end(),
true);
98 LOGP(info,
"Updating GRPECS");
99 std::fill(mSendCCDBOutputGRPECS.begin(), mSendCCDBOutputGRPECS.end(),
true);
101 LOGP(info,
"Detected default GRPECS object");
109 if (mCCDBRequest->askTime) {
110 const bool grpecsValid = pc.inputs().isValid(
"grpecs");
111 const bool orbitResetValid = pc.inputs().isValid(
"orbitReset");
115 if (orbitResetValid) {
116 pc.inputs().get<std::vector<Long64_t>*>(
"orbitReset");
118 if (pc.inputs().countValidInputs() == (grpecsValid + orbitResetValid)) {
126 if (mTFStart.front() <= -1) {
127 const auto firstTF =
tf;
128 const long offsetTF = std::abs(mTFStart.front() + 1);
129 const auto nTotTFs = getNRealTFs();
130 mTFStart = {firstTF + offsetTF, firstTF + offsetTF + nTotTFs};
131 mTFEnd = {mTFStart[1] - 1, mTFStart[1] - 1 + nTotTFs};
132 LOGP(info,
"Setting {} as first TF", mTFStart[0]);
133 LOGP(info,
"Using offset of {} TFs for setting the first TF", offsetTF);
137 const bool currentBuffer = (
tf > mTFEnd[mBuffer]) ? !mBuffer : mBuffer;
138 if (mTFStart[currentBuffer] >
tf) {
139 LOGP(info,
"all CRUs for current TF {} already received. Skipping this TF",
tf);
143 const unsigned int currentOutLane = getOutLane(
tf);
144 const unsigned int relTF = (
tf - mTFStart[currentBuffer]) / mNTFsBuffer;
145 LOGP(
debug,
"current TF: {} relative TF: {} current buffer: {} current output lane: {} mTFStart: {}",
tf, relTF, currentBuffer, currentOutLane, mTFStart[currentBuffer]);
147 if (relTF >= mProcessedCRU[currentBuffer].
size()) {
148 LOGP(warning,
"Skipping tf {}: relative tf {} is larger than size of buffer: {}",
tf, relTF, mProcessedCRU[currentBuffer].
size());
151 mProcessedTotalData = mCheckEveryNData;
152 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane,
tf);
156 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
161 if (mSendOutputStartInfo[currentBuffer]) {
162 mSendOutputStartInfo[currentBuffer] =
false;
166 if (mSendCCDBOutputOrbitReset[currentOutLane] && mSendCCDBOutputGRPECS[currentOutLane]) {
167 mSendCCDBOutputOrbitReset[currentOutLane] =
false;
168 mSendCCDBOutputGRPECS[currentOutLane] =
false;
173 auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
174 const unsigned int cru = tpcCRUHeader->subSpecification >> 7;
177 if (!(std::binary_search(mCRUs.begin(), mCRUs.end(), cru))) {
178 LOGP(
debug,
"Received data from CRU: {} which was not specified as input. Skipping", cru);
182 if (mProcessedCRUs[currentBuffer][relTF][cru]) {
186 ++mProcessedCRU[currentBuffer][relTF];
189 mProcessedCRUs[currentBuffer][relTF][cru] =
true;
196 LOGP(info,
"number of received CRUs for current TF: {} Needed a total number of processed CRUs of: {} Current TF: {}", mProcessedCRU[currentBuffer][relTF], mCRUs.size(),
tf);
199 if (mNTFsDataDrop > 0) {
200 checkIntervalsForMissingData(pc, currentBuffer, relTF, currentOutLane,
tf);
203 if (mProcessedCRU[currentBuffer][relTF] == mCRUs.size()) {
204 ++mProcessedTFs[currentBuffer];
207 if (mProcessedTFs[currentBuffer] == mTimeFrames) {
208 finishInterval(pc, currentOutLane, currentBuffer,
tf);
217 const std::string
name = fmt::format(
"IDCAGG{}", lane).data();
227 std::vector<uint32_t> mCRUs{};
228 const unsigned int mTimeFrames{};
229 const int mNTFsBuffer{1};
230 const unsigned int mOutLanes{};
231 std::array<unsigned int, 2> mProcessedTFs{{0, 0}};
232 std::array<std::vector<unsigned int>, 2> mProcessedCRU{};
233 std::array<std::vector<std::unordered_map<unsigned int, bool>>, 2> mProcessedCRUs{};
234 std::array<long, 2> mTFStart{};
235 std::array<long, 2> mTFEnd{};
236 std::array<bool, 2> mSendOutputStartInfo{
true,
true};
237 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
238 std::vector<bool> mSendCCDBOutputOrbitReset{};
239 std::vector<bool> mSendCCDBOutputGRPECS{};
240 unsigned int mCurrentOutLane{0};
243 int mNTFsDataDrop{0};
244 std::array<int, 2> mStartNTFsDataDrop{0};
245 long mProcessedTotalData{0};
246 int mCheckEveryNData{1};
247 std::vector<InputSpec> mFilter{};
248 std::vector<header::DataDescription> mDataDescrOut{};
256 unsigned int getOutLane(
const uint32_t
tf)
const {
return (
tf > mTFEnd[mBuffer]) ? (mCurrentOutLane + 1) % mOutLanes : mCurrentOutLane; }
259 unsigned int getNRealTFs()
const {
return mNTFsBuffer * mTimeFrames; }
261 void clearBuffer(
const bool currentBuffer)
264 for (
auto& crusMap : mProcessedCRUs[currentBuffer]) {
265 for (
auto& it : crusMap) {
270 mProcessedTFs[currentBuffer] = 0;
271 std::fill(mProcessedCRU[currentBuffer].
begin(), mProcessedCRU[currentBuffer].
end(), 0);
274 mTFStart[mBuffer] = mTFEnd[!mBuffer] + 1;
275 mTFEnd[mBuffer] = mTFStart[mBuffer] + getNRealTFs() - 1;
281 mCurrentOutLane = ++mCurrentOutLane % mOutLanes;
284 void checkIntervalsForMissingData(
o2::framework::ProcessingContext& pc,
const bool currentBuffer,
const long relTF,
const unsigned int currentOutLane,
const uint32_t
tf)
286 if (!(mProcessedTotalData++ % mCheckEveryNData)) {
287 LOGP(info,
"Checking for dropped packages...");
290 if ((mTFStart[currentBuffer] > mTFStart[!currentBuffer]) && (relTF > mNTFsDataDrop)) {
291 LOGP(warning,
"checking last buffer from {} to {}", mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size());
292 const unsigned int lastLane = (currentOutLane == 0) ? (mOutLanes - 1) : (currentOutLane - 1);
293 checkMissingData(pc, !currentBuffer, mStartNTFsDataDrop[!currentBuffer], mProcessedCRU[!currentBuffer].
size(), lastLane);
294 LOGP(info,
"All empty TFs for TF {} for current buffer filled with dummy and sent. Clearing buffer",
tf);
295 finishInterval(pc, lastLane, !currentBuffer,
tf);
298 const int tfEndCheck = std::clamp(
static_cast<int>(relTF) - mNTFsDataDrop, 0,
static_cast<int>(mProcessedCRU[currentBuffer].
size()));
299 LOGP(info,
"checking current buffer from {} to {}", mStartNTFsDataDrop[currentBuffer], tfEndCheck);
300 checkMissingData(pc, currentBuffer, mStartNTFsDataDrop[currentBuffer], tfEndCheck, currentOutLane);
301 mStartNTFsDataDrop[currentBuffer] = tfEndCheck;
307 for (
int iTF = startTF; iTF < endTF; ++iTF) {
308 if (mProcessedCRU[currentBuffer][iTF] != mCRUs.size()) {
309 LOGP(warning,
"CRUs for lane {} rel. TF: {} curr TF {} are missing! Processed {} CRUs out of {}", outLane, iTF, mTFStart[currentBuffer] + iTF, mProcessedCRU[currentBuffer][iTF], mCRUs.size());
310 ++mProcessedTFs[currentBuffer];
311 mProcessedCRU[currentBuffer][iTF] = mCRUs.size();
314 for (
auto& it : mProcessedCRUs[currentBuffer][iTF]) {
326 if (mNFactorTFs > 0) {
329 for (
unsigned int ilane = 0; ilane < mOutLanes; ++ilane) {
331 auto&
state = deviceProxy.getOutputChannelState({
static_cast<int>(ilane)});
332 size_t oldest = std::numeric_limits<size_t>::max() - 1;
333 state.oldestForChannel = {oldest};
337 LOGP(info,
"All TFs {} for current buffer received. Clearing buffer",
tf);
339 mStartNTFsDataDrop[
buffer] = 0;
340 mSendOutputStartInfo[
buffer] =
true;
344DataProcessorSpec getTPCDistributeIDCSpec(
const int ilane,
const std::vector<uint32_t>& crus,
const unsigned int timeframes,
const unsigned int outlanes,
const int firstTF,
const bool sendPrecisetimeStamp =
false,
const int nTFsBuffer = 1)
346 std::vector<InputSpec> inputSpecs;
348 for (
auto side : sides) {
349 const std::string
name = (
side == Side::A) ?
"idcsgroupa" :
"idcsgroupc";
353 std::vector<OutputSpec> outputSpecs;
354 outputSpecs.reserve(outlanes);
355 for (
unsigned int lane = 0; lane < outlanes; ++lane) {
360 bool fetchCCDB =
false;
361 if (sendPrecisetimeStamp && (ilane == 0)) {
363 for (
unsigned int lane = 0; lane < outlanes; ++lane) {
368 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(fetchCCDB,
376 const std::string
type =
"idc";
377 const auto id = fmt::format(
"tpc-distribute-{}-{:02}",
type, ilane);
382 AlgorithmSpec{adaptFromTask<TPCDistributeIDCSpec>(crus, timeframes, nTFsBuffer, outlanes, firstTF, ccdbRequest)},
383 Options{{
"drop-data-after-nTFs", VariantType::Int, 0, {
"Number of TFs after which to drop the data."}},
384 {
"check-data-every-n", VariantType::Int, 0, {
"Number of run function called after which to check for missing data (-1 for no checking, 0 for default checking)."}},
385 {
"nFactorTFs", VariantType::Int, 1000, {
"Number of TFs to skip for sending oldest TF."}}}};
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.