Project
Loading...
Searching...
No Matches
RateLimiter.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
19#include "Framework/Signpost.h"
20
21#include <fairmq/Device.h>
22#include <uv.h>
23#include <fairmq/shmem/Monitor.h>
24#include <fairmq/shmem/Common.h>
25#include <chrono>
26#include <thread>
27
29
30using namespace o2::framework;
31
32int RateLimiter::check(ProcessingContext& ctx, int maxInFlight, size_t minSHM)
33{
34 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
35 if (!maxInFlight && !minSHM) {
36 return 0;
37 }
38 auto device = ctx.services().get<RawDeviceService>().device();
39 auto& deviceState = ctx.services().get<DeviceState>();
40 if (maxInFlight && device->GetChannels().count("metric-feedback")) {
41 auto& dtc = ctx.services().get<DataTakingContext>();
42 const auto& device = ctx.services().get<RawDeviceService>().device();
43 const auto& deviceContext = ctx.services().get<DeviceContext>();
44 bool timeout = deviceContext.exitTransitionTimeout;
45 bool timeoutForMessage = dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
46 bool waitMessage = false;
47 int recvTimeout = 0;
48 auto startTime = std::chrono::system_clock::now();
49 static constexpr float MESSAGE_DELAY_TIME = 15.f;
50 while ((mSentTimeframes - mConsumedTimeframes) >= maxInFlight) {
51 if (recvTimeout != 0 && !waitMessage && (timeoutForMessage == false || std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::system_clock::now() - startTime).count() > MESSAGE_DELAY_TIME)) {
52 if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
53 O2_SIGNPOST_EVENT_EMIT_ALARM(rate_limiting, sid, "timeframe_ratelimit",
54 "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
55 maxInFlight, mSentTimeframes, mConsumedTimeframes);
56 } else {
57 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
58 "Maximum number of TF in flight reached (%d: published %llu - finished %llu), waiting",
59 maxInFlight, mSentTimeframes, mConsumedTimeframes);
60 }
61 waitMessage = true;
62 timeoutForMessage = false;
63 }
64 auto msg = device->NewMessageFor("metric-feedback", 0, 0);
65 int64_t count = 0;
66 do {
67 count = device->Receive(msg, "metric-feedback", 0, recvTimeout);
68 if (timeout && count <= 0 && device->NewStatePending()) {
69 return 1;
70 }
71 } while (count <= 0 && recvTimeout > 0 && !timeoutForMessage);
72
73 if (count <= 0) {
74 recvTimeout = timeout || timeoutForMessage ? 1000 : -1;
75 continue;
76 }
77 assert(msg->GetSize() == 8);
78 mConsumedTimeframes = *(int64_t*)msg->GetData();
79 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "timeframe_ratelimit",
80 "Received %llu as consumed timeframes",
81 mConsumedTimeframes);
82 }
83 if (waitMessage) {
84 if (dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS || dtc.deploymentMode == DeploymentMode::FST) {
85 O2_SIGNPOST_EVENT_EMIT_IMPORTANT(rate_limiting, sid, "timeframe_ratelimit",
86 "%lli / %d TF in flight, continue to publish",
87 (mSentTimeframes - mConsumedTimeframes), maxInFlight);
88 } else {
89 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "timeframe_ratelimit",
90 "%lli / %d TF in flight, continue to publish",
91 (mSentTimeframes - mConsumedTimeframes), maxInFlight);
92 }
93 }
94
95 bool doSmothThrottling = getenv("DPL_SMOOTH_RATE_LIMITING") && atoi(getenv("DPL_SMOOTH_RATE_LIMITING"));
96 if (doSmothThrottling) {
97 constexpr float factorStart = 0.7f;
98 constexpr float factorFinal = 0.98f;
99 constexpr float factorOfAverage = 0.7f;
100 constexpr int64_t iterationsFinal = 2;
101 auto curTime = std::chrono::system_clock::now();
102 if (mTfTimes.size() != maxInFlight) {
103 mTfTimes.resize(maxInFlight);
104 mTimeCountingSince = mSentTimeframes;
105 mFirstTime = curTime;
106 }
107 if (mSentTimeframes >= mTimeCountingSince + maxInFlight) {
108 float iterationDuration = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mTfTimes[mSentTimeframes % maxInFlight]).count();
109 float totalAverage = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mFirstTime).count() / (mSentTimeframes - mTimeCountingSince);
110 if (mSmothDelay == 0.f) {
111 mSmothDelay = iterationDuration / maxInFlight * factorStart;
112 LOG(debug) << "TF Throttling delay initialized to " << mSmothDelay;
113 } else {
114 float factor;
115 if (mSentTimeframes < maxInFlight) {
116 factor = factorStart;
117 } else if (mSentTimeframes >= (iterationsFinal + 1) * maxInFlight) {
118 factor = factorFinal;
119 } else {
120 factor = factorStart + (factorFinal - factorStart) * (float)(mSentTimeframes - maxInFlight) / (float)(iterationsFinal * maxInFlight);
121 }
122 float newDelay = iterationDuration / maxInFlight * factor;
123 if (newDelay > totalAverage) {
124 LOG(debug) << "TF Throttling: Correcting delay down to average " << newDelay << " --> " << totalAverage;
125 newDelay = totalAverage;
126 } else if (newDelay < factorOfAverage * totalAverage) {
127 LOG(debug) << "TF Throttling: Correcting delay up to " << factorOfAverage << " * average " << newDelay << " --> " << factorOfAverage * totalAverage;
128 newDelay = factorOfAverage * totalAverage;
129 }
130 mSmothDelay = (float)(maxInFlight - 1) / (float)maxInFlight * mSmothDelay + newDelay / (float)maxInFlight;
131 LOG(debug) << "TF Throttling delay updated to " << mSmothDelay << " (factor " << factor << " Duration " << iterationDuration / maxInFlight << " = " << iterationDuration << " / " << maxInFlight << " --> " << newDelay << ")";
132 }
133 float elapsed = std::chrono::duration_cast<std::chrono::duration<float>>(curTime - mLastTime).count();
134 if (elapsed < mSmothDelay) {
135 LOG(debug) << "TF Throttling: Elapsed " << elapsed << " --> Waiting for " << mSmothDelay - elapsed;
136 uv_run(deviceState.loop, UV_RUN_NOWAIT);
137 std::this_thread::sleep_for(std::chrono::microseconds((size_t)((mSmothDelay - elapsed) * 1.e6f)));
138 }
139 }
140 mLastTime = std::chrono::system_clock::now();
141 mTfTimes[mSentTimeframes % maxInFlight] = curTime;
142 }
143 }
144 if (minSHM) {
145 int waitMessage = 0;
146 auto& runningWorkflow = ctx.services().get<RunningWorkflowInfo const>();
147 while (true) {
148 long freeMemory = -1;
149 try {
150 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::ShmId{fair::mq::shmem::makeShmIdStr(device->fConfig->GetProperty<uint64_t>("shmid"))}, runningWorkflow.shmSegmentId);
151 } catch (...) {
152 }
153 if (freeMemory == -1) {
154 try {
155 freeMemory = fair::mq::shmem::Monitor::GetFreeMemory(fair::mq::shmem::SessionId{device->fConfig->GetProperty<std::string>("session")}, runningWorkflow.shmSegmentId);
156 } catch (...) {
157 }
158 }
159 if (freeMemory == -1) {
160 throw std::runtime_error("Could not obtain free SHM memory");
161 }
162 uint64_t freeSHM = freeMemory;
163 if (freeSHM > minSHM) {
164 if (waitMessage) {
165 LOG(important) << "Sufficient SHM memory free (" << freeSHM << " >= " << minSHM << "), continuing to publish";
166 }
167 static bool showReport = getenv("DPL_REPORT_PROCESSING") && atoi(getenv("DPL_REPORT_PROCESSING"));
168 if (showReport) {
169 LOG(info) << "Free SHM Report: " << freeSHM;
170 }
171 break;
172 }
173 if (waitMessage == 0) {
174 LOG(alarm) << "Free SHM memory too low: " << freeSHM << " < " << minSHM << ", waiting";
175 waitMessage = 1;
176 }
177 usleep(30000);
178 }
179 }
180 mSentTimeframes++;
181 return 0;
182}
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_EVENT_EMIT_ALARM(log, id, name, format,...)
Definition Signpost.h:577
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT_IMPORTANT(log, id, name, format,...)
Definition Signpost.h:587
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
std::ostringstream debug
ServiceRegistryRef services()
The services registry associated with this processing context.
int check(ProcessingContext &ctx, int maxInFlight, size_t minSHM)
GLint GLsizei count
Definition glcorearb.h:399
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Running state information of a given device.
Definition DeviceState.h:34
Information about the running workflow.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153