30 if (!maxInFlight && !minSHM) {
35 if (maxInFlight && device->GetChannels().count(
"metric-feedback")) {
41 bool waitMessage =
false;
43 auto startTime = std::chrono::system_clock::now();
44 static constexpr float MESSAGE_DELAY_TIME = 15.f;
45 while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) {
46 if (recvTimeout != 0 && !waitMessage && (timeoutForMessage ==
false || std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::system_clock::now() - startTime).
count() > MESSAGE_DELAY_TIME)) {
48 LOG(alarm) <<
"Maximum number of TF in flight reached (" << maxInFlight <<
": published " << mSentTimeframes <<
" - finished " << mConsumedTimeframes <<
"), waiting";
50 LOG(info) <<
"Maximum number of TF in flight reached (" << maxInFlight <<
": published " << mSentTimeframes <<
" - finished " << mConsumedTimeframes <<
"), waiting";
53 timeoutForMessage =
false;
55 auto msg = device->NewMessageFor(
"metric-feedback", 0, 0);
58 count = device->Receive(
msg,
"metric-feedback", 0, recvTimeout);
59 if (
timeout && count <= 0 && device->NewStatePending()) {
62 }
while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);
65 recvTimeout =
timeout || timeoutForMessage ? 1000 : -1;
68 assert(
msg->GetSize() == 8);
69 mConsumedTimeframes = *(int64_t*)
msg->GetData();
73 LOG(important) << (mSentTimeframes - mConsumedTimeframes) <<
" / " << maxInFlight <<
" TF in flight, continuing to publish";
75 LOG(info) << (mSentTimeframes - mConsumedTimeframes) <<
" / " << maxInFlight <<
" TF in flight, continuing to publish";
79 bool doSmothThrottling = getenv(
"DPL_SMOOTH_RATE_LIMITING") && atoi(getenv(
"DPL_SMOOTH_RATE_LIMITING"));
80 if (doSmothThrottling) {
81 constexpr float factorStart = 0.7f;
82 constexpr float factorFinal = 0.98f;
83 constexpr float factorOfAverage = 0.7f;
84 constexpr int64_t iterationsFinal = 2;
85 auto curTime = std::chrono::system_clock::now();
86 if (mTfTimes.size() != maxInFlight) {
87 mTfTimes.resize(maxInFlight);
88 mTimeCountingSince = mSentTimeframes;
91 if (mSentTimeframes >= mTimeCountingSince + maxInFlight) {
92 float iterationDuration = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mTfTimes[mSentTimeframes % maxInFlight]).
count();
93 float totalAverage = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mFirstTime).
count() / (mSentTimeframes - mTimeCountingSince);
94 if (mSmothDelay == 0.f) {
95 mSmothDelay = iterationDuration / maxInFlight * factorStart;
96 LOG(
debug) <<
"TF Throttling delay initialized to " << mSmothDelay;
99 if (mSentTimeframes < maxInFlight) {
100 factor = factorStart;
101 }
else if (mSentTimeframes >= (iterationsFinal + 1) * maxInFlight) {
102 factor = factorFinal;
104 factor = factorStart + (factorFinal - factorStart) * (
float)(mSentTimeframes - maxInFlight) / (
float)(iterationsFinal * maxInFlight);
106 float newDelay = iterationDuration / maxInFlight * factor;
107 if (newDelay > totalAverage) {
108 LOG(
debug) <<
"TF Throttling: Correcting delay down to average " << newDelay <<
" --> " << totalAverage;
109 newDelay = totalAverage;
110 }
else if (newDelay < factorOfAverage * totalAverage) {
111 LOG(
debug) <<
"TF Throttling: Correcting delay up to " << factorOfAverage <<
" * average " << newDelay <<
" --> " << factorOfAverage * totalAverage;
112 newDelay = factorOfAverage * totalAverage;
114 mSmothDelay = (float)(maxInFlight - 1) / (float)maxInFlight * mSmothDelay + newDelay / (
float)maxInFlight;
115 LOG(
debug) <<
"TF Throttling delay updated to " << mSmothDelay <<
" (factor " << factor <<
" Duration " << iterationDuration / maxInFlight <<
" = " << iterationDuration <<
" / " << maxInFlight <<
" --> " << newDelay <<
")";
117 float elapsed = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mLastTime).
count();
118 if (elapsed < mSmothDelay) {
119 LOG(
debug) <<
"TF Throttling: Elapsed " << elapsed <<
" --> Waiting for " << mSmothDelay - elapsed;
120 uv_run(deviceState.loop, UV_RUN_NOWAIT);
121 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)((mSmothDelay - elapsed) * 1.e6f)));
124 mLastTime = std::chrono::system_clock::now();
125 mTfTimes[mSentTimeframes % maxInFlight] = curTime;
132 long freeMemory = -1;
134 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::ShmId{fair::mq::shmem::makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
137 if (freeMemory == -1) {
139 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
143 if (freeMemory == -1) {
144 throw std::runtime_error(
"Could not obtain free SHM memory");
146 uint64_t freeSHM = freeMemory;
147 if (freeSHM > minSHM) {
149 LOG(important) <<
"Sufficient SHM memory free (" << freeSHM <<
" >= " << minSHM <<
"), continuing to publish";
151 static bool showReport = getenv(
"DPL_REPORT_PROCESSING") && atoi(getenv(
"DPL_REPORT_PROCESSING"));
153 LOG(info) <<
"Free SHM Report: " << freeSHM;
157 if (waitMessage == 0) {
158 LOG(alarm) <<
"Free SHM memory too low: " << freeSHM <<
" < " << minSHM <<
", waiting";
ServiceRegistryRef services()
The services registry associated with this processing context.