35 if (!maxInFlight && !minSHM) {
40 if (maxInFlight && device->GetChannels().count(
"metric-feedback")) {
46 bool waitMessage =
false;
48 auto startTime = std::chrono::system_clock::now();
49 static constexpr float MESSAGE_DELAY_TIME = 15.f;
50 while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) {
51 if (recvTimeout != 0 && !waitMessage && (timeoutForMessage ==
false || std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::system_clock::now() - startTime).
count() > MESSAGE_DELAY_TIME)) {
54 "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
55 maxInFlight, mSentTimeframes, mConsumedTimeframes);
58 "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
59 maxInFlight, mSentTimeframes, mConsumedTimeframes);
62 timeoutForMessage =
false;
64 auto msg = device->NewMessageFor(
"metric-feedback", 0, 0);
67 count = device->Receive(
msg,
"metric-feedback", 0, recvTimeout);
68 if (
timeout && count <= 0 && device->NewStatePending()) {
71 }
while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);
74 recvTimeout =
timeout || timeoutForMessage ? 1000 : -1;
77 assert(
msg->GetSize() == 8);
78 mConsumedTimeframes = *(int64_t*)
msg->GetData();
80 "Received %llu as consumed timeframes",
86 "%lli / %d TF in flight, continue to publish",
87 (mSentTimeframes - mConsumedTimeframes), maxInFlight);
90 "%lli / %d TF in flight, continue to publish",
91 (mSentTimeframes - mConsumedTimeframes), maxInFlight);
95 bool doSmothThrottling = getenv(
"DPL_SMOOTH_RATE_LIMITING") && atoi(getenv(
"DPL_SMOOTH_RATE_LIMITING"));
96 if (doSmothThrottling) {
97 constexpr float factorStart = 0.7f;
98 constexpr float factorFinal = 0.98f;
99 constexpr float factorOfAverage = 0.7f;
100 constexpr int64_t iterationsFinal = 2;
101 auto curTime = std::chrono::system_clock::now();
102 if (mTfTimes.size() != maxInFlight) {
103 mTfTimes.resize(maxInFlight);
104 mTimeCountingSince = mSentTimeframes;
105 mFirstTime = curTime;
107 if (mSentTimeframes >= mTimeCountingSince + maxInFlight) {
108 float iterationDuration = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mTfTimes[mSentTimeframes % maxInFlight]).
count();
109 float totalAverage = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mFirstTime).
count() / (mSentTimeframes - mTimeCountingSince);
110 if (mSmothDelay == 0.f) {
111 mSmothDelay = iterationDuration / maxInFlight * factorStart;
112 LOG(
debug) <<
"TF Throttling delay initialized to " << mSmothDelay;
115 if (mSentTimeframes < maxInFlight) {
116 factor = factorStart;
117 }
else if (mSentTimeframes >= (iterationsFinal + 1) * maxInFlight) {
118 factor = factorFinal;
120 factor = factorStart + (factorFinal - factorStart) * (
float)(mSentTimeframes - maxInFlight) / (
float)(iterationsFinal * maxInFlight);
122 float newDelay = iterationDuration / maxInFlight * factor;
123 if (newDelay > totalAverage) {
124 LOG(
debug) <<
"TF Throttling: Correcting delay down to average " << newDelay <<
" --> " << totalAverage;
125 newDelay = totalAverage;
126 }
else if (newDelay < factorOfAverage * totalAverage) {
127 LOG(
debug) <<
"TF Throttling: Correcting delay up to " << factorOfAverage <<
" * average " << newDelay <<
" --> " << factorOfAverage * totalAverage;
128 newDelay = factorOfAverage * totalAverage;
130 mSmothDelay = (float)(maxInFlight - 1) / (float)maxInFlight * mSmothDelay + newDelay / (
float)maxInFlight;
131 LOG(
debug) <<
"TF Throttling delay updated to " << mSmothDelay <<
" (factor " << factor <<
" Duration " << iterationDuration / maxInFlight <<
" = " << iterationDuration <<
" / " << maxInFlight <<
" --> " << newDelay <<
")";
133 float elapsed = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mLastTime).
count();
134 if (elapsed < mSmothDelay) {
135 LOG(
debug) <<
"TF Throttling: Elapsed " << elapsed <<
" --> Waiting for " << mSmothDelay - elapsed;
136 uv_run(deviceState.loop, UV_RUN_NOWAIT);
137 std::this_thread::sleep_for(std::chrono::microseconds((
size_t)((mSmothDelay - elapsed) * 1.e6f)));
140 mLastTime = std::chrono::system_clock::now();
141 mTfTimes[mSentTimeframes % maxInFlight] = curTime;
148 long freeMemory = -1;
150 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::ShmId{fair::mq::shmem::makeShmIdStr(device->fConfig->GetProperty<uint64_t>(
"shmid"))}, runningWorkflow.shmSegmentId);
153 if (freeMemory == -1) {
155 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::SessionId{device->fConfig->GetProperty<std::string>(
"session")}, runningWorkflow.shmSegmentId);
159 if (freeMemory == -1) {
160 throw std::runtime_error(
"Could not obtain free SHM memory");
162 uint64_t freeSHM = freeMemory;
163 if (freeSHM > minSHM) {
165 LOG(important) <<
"Sufficient SHM memory free (" << freeSHM <<
" >= " << minSHM <<
"), continuing to publish";
167 static bool showReport = getenv(
"DPL_REPORT_PROCESSING") && atoi(getenv(
"DPL_REPORT_PROCESSING"));
169 LOG(info) <<
"Free SHM Report: " << freeSHM;
173 if (waitMessage == 0) {
174 LOG(alarm) <<
"Free SHM memory too low: " << freeSHM <<
" < " << minSHM <<
", waiting";
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT_ALARM(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_IMPORTANT(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
ServiceRegistryRef services()
The services registry associated with this processing context.