Project
Loading...
Searching...
No Matches
RateLimiter.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
19#include <fairmq/Device.h>
20#include <uv.h>
21#include <fairmq/shmem/Monitor.h>
22#include <fairmq/shmem/Common.h>
23#include <chrono>
24#include <thread>
25
26using namespace o2::framework;
27
28int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
29{
30 if (!maxInFlight && !minSHM) {
31 return 0;
32 }
33 auto device = ctx.services().get<RawDeviceService>().device();
34 auto& deviceState = ctx.services().get<DeviceState>();
35 if (maxInFlight && device->GetChannels().count("metric-feedback")) {
36 auto& dtc = ctx.services().get<DataTakingContext>();
37 const auto& device = ctx.services().get<RawDeviceService>().device();
38 const auto& deviceContext = ctx.services().get<DeviceContext>();
39 bool timeout = deviceContext.exitTransitionTimeout;
40 bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
41 bool waitMessage = false;
42 int recvTimeout = 0;
43 auto startTime = std::chrono::system_clock::now();
44 static constexpr float MESSAGE_DELAY_TIME = 15.f;
45 while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) {
46 if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::system_clock::now() - startTime).count() > MESSAGE_DELAY_TIME)) {
47 if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
48 LOG(alarm) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting";
49 } else {
50 LOG(info) << "Maximum number of TF in flight reached (" << maxInFlight << ": published " << mSentTimeframes << " - finished " << mConsumedTimeframes << "), waiting";
51 }
52 waitMessage = true;
53 timeoutForMessage = false;
54 }
55 auto msg = device->NewMessageFor("metric-feedback", 0, 0);
56 int64_t count = 0;
57 do {
58 count = device->Receive(msg, "metric-feedback", 0, recvTimeout);
59 if (timeout && count <= 0 && device->NewStatePending()) {
60 return 1;
61 }
62 } while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);
63
64 if (count <= 0) {
65 recvTimeout = timeout || timeoutForMessage ? 1000 : -1;
66 continue;
67 }
68 assert(msg->GetSize() == 8);
69 mConsumedTimeframes = *(int64_t*)msg->GetData();
70 }
71 if (waitMessage) {
72 if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
73 LOG(important) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
74 } else {
75 LOG(info) << (mSentTimeframes - mConsumedTimeframes) << " / " << maxInFlight << " TF in flight, continuing to publish";
76 }
77 }
78
79 bool doSmothThrottling = getenv("DPL_SMOOTH_RATE_LIMITING") && atoi(getenv("DPL_SMOOTH_RATE_LIMITING"));
80 if (doSmothThrottling) {
81 constexpr float factorStart = 0.7f;
82 constexpr float factorFinal = 0.98f;
83 constexpr float factorOfAverage = 0.7f;
84 constexpr int64_t iterationsFinal = 2;
85 auto curTime = std::chrono::system_clock::now();
86 if (mTfTimes.size() != maxInFlight) {
87 mTfTimes.resize(maxInFlight);
88 mTimeCountingSince = mSentTimeframes;
89 mFirstTime = curTime;
90 }
91 if (mSentTimeframes >= mTimeCountingSince + maxInFlight) {
92 float iterationDuration = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mTfTimes[mSentTimeframes % maxInFlight]).count();
93 float totalAverage = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mFirstTime).count() / (mSentTimeframes - mTimeCountingSince);
94 if (mSmothDelay == 0.f) {
95 mSmothDelay = iterationDuration / maxInFlight * factorStart;
96 LOG(debug) << "TF Throttling delay initialized to " << mSmothDelay;
97 } else {
98 float factor;
99 if (mSentTimeframes < maxInFlight) {
100 factor = factorStart;
101 } else if (mSentTimeframes >= (iterationsFinal + 1) * maxInFlight) {
102 factor = factorFinal;
103 } else {
104 factor = factorStart + (factorFinal - factorStart) * (float)(mSentTimeframes - maxInFlight) / (float)(iterationsFinal * maxInFlight);
105 }
106 float newDelay = iterationDuration / maxInFlight * factor;
107 if (newDelay > totalAverage) {
108 LOG(debug) << "TF Throttling: Correcting delay down to average " << newDelay << " --> " << totalAverage;
109 newDelay = totalAverage;
110 } else if (newDelay < factorOfAverage * totalAverage) {
111 LOG(debug) << "TF Throttling: Correcting delay up to " << factorOfAverage << " * average " << newDelay << " --> " << factorOfAverage * totalAverage;
112 newDelay = factorOfAverage * totalAverage;
113 }
114 mSmothDelay = (float)(maxInFlight - 1) / (float)maxInFlight * mSmothDelay + newDelay / (float)maxInFlight;
115 LOG(debug) << "TF Throttling delay updated to " << mSmothDelay << " (factor " << factor << " Duration " << iterationDuration / maxInFlight << " = " << iterationDuration << " / " << maxInFlight << " --> " << newDelay << ")";
116 }
117 float elapsed = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mLastTime).count();
118 if (elapsed < mSmothDelay) {
119 LOG(debug) << "TF Throttling: Elapsed " << elapsed << " --> Waiting for " << mSmothDelay - elapsed;
120 uv_run(deviceState.loop, UV_RUN_NOWAIT);
121 std::this_thread::sleep_for(std::chrono::microseconds((size_t)((mSmothDelay - elapsed) * 1.e6f)));
122 }
123 }
124 mLastTime = std::chrono::system_clock::now();
125 mTfTimes[mSentTimeframes % maxInFlight] = curTime;
126 }
127 }
128 if (minSHM) {
129 int waitMessage = 0;
130 auto& runningWorkflow = ctx.services().get<RunningWorkflowInfo const>();
131 while (true) {
132 long freeMemory = -1;
133 try {
134 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::ShmId{fair::mq::shmem::makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
135 } catch (...) {
136 }
137 if (freeMemory == -1) {
138 try {
139 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
140 } catch (...) {
141 }
142 }
143 if (freeMemory == -1) {
144 throw std::runtime_error("Could not obtain free SHM memory");
145 }
146 uint64_t freeSHM = freeMemory;
147 if (freeSHM > minSHM) {
148 if (waitMessage) {
149 LOG(important) << "Sufficient SHM memory free (" << freeSHM << " >= " << minSHM << "), continuing to publish";
150 }
151 static bool showReport = getenv("DPL_REPORT_PROCESSING") && atoi(getenv("DPL_REPORT_PROCESSING"));
152 if (showReport) {
153 LOG(info) << "Free SHM Report: " << freeSHM;
154 }
155 break;
156 }
157 if (waitMessage == 0) {
158 LOG(alarm) << "Free SHM memory too low: " << freeSHM << " < " << minSHM << ", waiting";
159 waitMessage = 1;
160 }
161 usleep(30000);
162 }
163 }
164 mSentTimeframes++;
165 return 0;
166}
std::ostringstream debug
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
GLint GLsizei count
Definition glcorearb.h:399
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Running state information of a given device.
Definition DeviceState.h:34
Information about the running workflow.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153