Project
Loading...
Searching...
No Matches
ComputingQuotaEvaluator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include "Framework/Signpost.h"
17#include <Monitoring/Monitoring.h>
18
19#include <vector>
20#include <uv.h>
21#include <cassert>
22#include <fmt/core.h>
23#include <fmt/format.h>
24#include <fmt/ranges.h>
25
27
28namespace o2::framework
29{
30
32 : mRef(ref)
33{
34 auto& state = mRef.get<DeviceState>();
35 // The first offer is valid, but does not contain any resource
36 // so this will only work with some device which does not require
37 // any CPU. Notice this will have troubles if a given DPL process
38 // runs for more than a year.
40 .cpu = 0,
41 .memory = 0,
42 .sharedMemory = 0,
43 .timeslices = 0,
44 .runtime = -1,
45 .score = OfferScore::Unneeded,
46 .valid = true};
47 mInfos[0] = {
48 uv_now(state.loop),
49 0,
50 0};
51
52 // Creating a timer to check for expired offers
53 mTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
54 uv_timer_init(state.loop, mTimer);
55}
56
58 std::vector<int> invalidOffers;
59 std::vector<int> otherUser;
60 std::vector<int> unexpiring;
61 std::vector<int> selectedOffers;
62 std::vector<int> expired;
63};
64
65bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector, uint64_t now, ComputingQuotaOffer* outAccumulated)
66{
67 O2_SIGNPOST_ID_GENERATE(qid, quota);
68
69 auto selectOffer = [&offers = this->mOffers, &infos = this->mInfos, task](int ref, uint64_t now) {
70 auto& selected = offers[ref];
71 auto& info = infos[ref];
72 selected.user = task;
73 if (info.firstUsed == 0) {
74 info.firstUsed = now;
75 }
76 info.lastUsed = now;
77 };
78
79 ComputingQuotaOffer accumulated;
80 static QuotaEvaluatorStats stats;
81
82 stats.invalidOffers.clear();
83 stats.otherUser.clear();
84 stats.unexpiring.clear();
85 stats.selectedOffers.clear();
86 stats.expired.clear();
87
88 auto summarizeWhatHappended = [ref = mRef](bool enough, std::vector<int> const& result, ComputingQuotaOffer const& totalOffer, QuotaEvaluatorStats& stats) -> bool {
89 auto& dpStats = ref.get<DataProcessingStats>();
90 if (result.size() == 1 && result[0] == 0) {
91 // LOG(LOGLEVEL) << "No particular resource was requested, so we schedule task anyways";
92 return enough;
93 }
94 O2_SIGNPOST_ID_GENERATE(sid, quota);
95 if (enough) {
96 O2_SIGNPOST_START(quota, sid, "summary", "%zu offers were selected for a total of: cpu %d, memory %lli, shared memory %lli",
97 result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
98 for (auto& offer : result) {
99 // We pretend each offer id is a pointer, to have a unique id.
100 O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(offer * 8));
101 O2_SIGNPOST_START(quota, oid, "offers", "Offer %d has been selected.", offer);
102 }
103 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1});
104 } else {
105 if (result.size()) {
106 O2_SIGNPOST_START(quota, sid, "summary", "Not enough resources: accumulated %zu partial offers providing cpu=%d, memory=%lld MB, shared memory=%lld MB, timeslices=%lld, but still insufficient.",
107 result.size(), totalOffer.cpu, totalOffer.memory / 1000000, totalOffer.sharedMemory / 1000000, totalOffer.timeslices);
108 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT), DataProcessingStats::Op::Add, 1});
109 } else {
110 O2_SIGNPOST_START(quota, sid, "summary", "Not enough resources: no suitable offers found (all offers were invalid, expired, or owned by other tasks).");
111 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_MISSING), DataProcessingStats::Op::Add, 1});
112 }
113 }
114 if (stats.invalidOffers.size()) {
115 O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers were invalid: %s", fmt::format("{}", fmt::join(stats.invalidOffers, ", ")).c_str());
116 }
117 if (stats.otherUser.size()) {
118 O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers were owned by other users: %s", fmt::format("{}", fmt::join(stats.otherUser, ", ")).c_str());
119 }
120 if (stats.expired.size()) {
121 O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers are expired: %s", fmt::format("{}", fmt::join(stats.expired, ", ")).c_str());
122 }
123 if (stats.unexpiring.size() > 1) {
124 O2_SIGNPOST_EVENT_EMIT(quota, sid, "summary", "The following offers will never expire: %s", fmt::format("{}", fmt::join(stats.unexpiring, ", ")).c_str());
125 }
126 O2_SIGNPOST_END(quota, sid, "summary", "Done selecting offers.");
127
128 return enough;
129 };
130
131 bool enough = false;
132 int64_t minValidity = 0;
133
134 for (int i = 0; i != mOffers.size(); ++i) {
135 auto& offer = mOffers[i];
136 auto& info = mInfos[i];
137 if (enough) {
138 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "We have enough offers. We can continue for computation.");
139 break;
140 }
141 // Ignore:
142 // - Invalid offers
143 // - Offers which belong to another task
144 // - Expired offers
145 if (offer.valid == false) {
146 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d is not valid. Skipping", i);
147 stats.invalidOffers.push_back(i);
148 continue;
149 }
150 if (offer.user != -1 && offer.user != task) {
151 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d already offered to some other user", i);
152 stats.otherUser.push_back(i);
153 continue;
154 }
155 if (offer.runtime < 0) {
156 stats.unexpiring.push_back(i);
157 } else if (offer.runtime + info.received < now) {
158 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d expired since %llu milliseconds and holds %llu MB and %llu timeslices",
159 i, now - offer.runtime - info.received, offer.sharedMemory / 1000000, offer.timeslices);
161 stats.expired.push_back(i);
162 continue;
163 } else {
164 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d still valid for %llu milliseconds, providing %llu MB and %llu timeslices",
165 i, offer.runtime + info.received - now, offer.sharedMemory / 1000000, offer.timeslices);
166 if (minValidity == 0) {
167 minValidity = offer.runtime + info.received - now;
168 }
169 minValidity = std::min(minValidity, (int64_t)(offer.runtime + info.received - now));
170 }
172 assert(offer.sharedMemory >= 0);
173 auto tmp = accumulated;
174 tmp.cpu += offer.cpu;
175 tmp.memory += offer.memory;
176 tmp.sharedMemory += offer.sharedMemory;
177 tmp.timeslices += offer.timeslices;
178 offer.score = selector(offer, accumulated);
179 switch (offer.score) {
181 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d considered not needed. Skipping", i);
182 continue;
184 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d considered Unsuitable. Skipping", i);
185 continue;
186 case OfferScore::More:
187 selectOffer(i, now);
188 accumulated = tmp;
189 stats.selectedOffers.push_back(i);
190 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Offer %d selected but not enough. %llu MB, %d cores and %llu timeslices are not enough.",
191 i, tmp.sharedMemory / 1000000, tmp.cpu, tmp.timeslices);
192 continue;
194 selectOffer(i, now);
195 accumulated = tmp;
196 stats.selectedOffers.push_back(i);
197 enough = true;
198 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Selected %zu offers providing %llu MB, %d cores and %llu timeslices are deemed enough.",
199 stats.selectedOffers.size(), tmp.sharedMemory / 1000000, tmp.cpu, tmp.timeslices);
200 break;
201 };
202 }
203
204 if (minValidity != 0) {
205 O2_SIGNPOST_EVENT_EMIT(quota, qid, "select", "Next offer to expire in %llu milliseconds", minValidity);
206 uv_timer_start(mTimer, [](uv_timer_t* handle) {
207 O2_SIGNPOST_ID_GENERATE(tid, quota);
208 O2_SIGNPOST_EVENT_EMIT(quota, tid, "select", "Offer should be expired by now, checking again."); }, minValidity + 100, 0);
209 }
210 // If we get here it means we never got enough offers, so we return false.
211 bool result = summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats);
212 if (outAccumulated) {
213 *outAccumulated = accumulated;
214 }
215 return result;
216}
217
218void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer, std::function<void(ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& reportConsumedOffer)>& reportConsumedOffer)
219{
220 // This will report how much of the offers has to be considered consumed.
221 // Notice that actual memory usage might be larger, because we can over
222 // allocate.
223 consumer(id, mOffers, mStats, reportConsumedOffer);
224}
225
227{
228 for (int oi = 0; oi < mOffers.size(); ++oi) {
229 auto& offer = mOffers[oi];
230 if (offer.user != taskId) {
231 continue;
232 }
233 offer.user = -1;
234 // Disposing the offer so that the resource can be recyled.
237 if (oi == 0) {
238 return;
239 }
240 if (offer.valid == false) {
241 continue;
242 }
243 if (offer.sharedMemory <= 0) {
244 O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(oi * 8));
245 O2_SIGNPOST_END(quota, oid, "offers", "Offer %d back to not needed.", oi);
246 offer.valid = false;
247 offer.score = OfferScore::Unneeded;
248 }
249 }
250}
251
253void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pending, uint64_t now)
254{
255 O2_SIGNPOST_ID_GENERATE(oid, quota);
256 O2_SIGNPOST_START(quota, oid, "updateOffers", "Starting to process %zu received offers", pending.size());
257 int lastValid = -1;
258 for (size_t oi = 0; oi < mOffers.size(); oi++) {
259 auto& storeOffer = mOffers[oi];
260 auto& info = mInfos[oi];
261 if (pending.empty()) {
262 O2_SIGNPOST_END(quota, oid, "updateOffers", "No more pending offers to process");
263 return;
264 }
265 if (storeOffer.valid == true) {
266 O2_SIGNPOST_EVENT_EMIT(quota, oid, "updateOffers", "Skipping update of offer %zu because it's still valid", oi);
267 // In general we want to fill an invalid offer. If we do not find any
268 // we add to the last valid offer we found.
269 lastValid = oi;
270 continue;
271 }
272 info.received = now;
273 auto& offer = pending.back();
274 O2_SIGNPOST_EVENT_EMIT(quota, oid, "updateOffers", "Updating of offer %zu at %llu. Cpu: %d, Shared Memory %lli, Timeslices: %lli",
275 oi, now, offer.cpu, offer.sharedMemory, offer.timeslices);
276 storeOffer = offer;
277 storeOffer.valid = true;
278 pending.pop_back();
279 }
280 if (lastValid == -1) {
281 O2_SIGNPOST_END_WITH_ERROR(quota, oid, "updateOffers", "ComputingQuotaOffer losts. This should never happen.");
282 return;
283 }
284 auto& lastValidOffer = mOffers[lastValid];
285 for (auto& stillPending : pending) {
286 lastValidOffer.cpu += stillPending.cpu;
287 lastValidOffer.memory += stillPending.memory;
288 lastValidOffer.sharedMemory += stillPending.sharedMemory;
289 lastValidOffer.timeslices += stillPending.timeslices;
290 lastValidOffer.runtime = std::max(lastValidOffer.runtime, stillPending.runtime);
291 }
292 pending.clear();
293 auto& updatedOffer = mOffers[lastValid];
294 O2_SIGNPOST_END(quota, oid, "updateOffers", "Remaining offers cohalesced to %d. New values: Cpu%d, Shared Memory %lli, Timeslices %lli",
295 lastValid, updatedOffer.cpu, updatedOffer.sharedMemory, updatedOffer.timeslices);
296}
297
298void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> expirator)
299{
300 static int nothingToDoCount = mExpiredOffers.size();
301 O2_SIGNPOST_ID_GENERATE(qid, quota);
302 if (mExpiredOffers.size()) {
303 O2_SIGNPOST_EVENT_EMIT(quota, qid, "handleExpired", "Handling %zu expired offers", mExpiredOffers.size());
304 nothingToDoCount = 0;
305 } else {
306 if (nothingToDoCount == 0) {
307 nothingToDoCount++;
308 O2_SIGNPOST_EVENT_EMIT(quota, qid, "handleExpired", "No expired offers");
309 }
310 }
313 for (auto& ref : mExpiredOffers) {
314 auto& offer = mOffers[ref.index];
315 O2_SIGNPOST_ID_FROM_POINTER(oid, quota, (void*)(int64_t)(ref.index * 8));
316 if (offer.sharedMemory < 0 && offer.timeslices < 0) {
317 O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d does not have any more resources. Marking it as invalid.", ref.index);
318 offer.valid = false;
319 offer.score = OfferScore::Unneeded;
320 continue;
321 }
322 // FIXME: offers should go through the driver client, not the monitoring
323 // api.
324 O2_SIGNPOST_END(quota, oid, "handleExpired", "Offer %d expired. Giving back %llu MB, %d cores and %llu timeslices",
325 ref.index, offer.sharedMemory / 1000000, offer.cpu, offer.timeslices);
326 mStats.totalExpiredBytes += std::max<int64_t>(offer.sharedMemory, 0);
327 mStats.totalExpiredTimeslices += std::max<int64_t>(offer.timeslices, 0);
329 expirator(offer, mStats);
330 // driverClient.tell("expired shmem {}", offer.sharedMemory);
331 // driverClient.tell("expired cpu {}", offer.cpu);
332 offer.sharedMemory = -1;
333 offer.timeslices = -1;
334 offer.valid = false;
335 offer.score = OfferScore::Unneeded;
336 }
337 mExpiredOffers.clear();
338}
339
340} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
Definition Signpost.h:616
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
void consume(int taskId, ComputingQuotaConsumer &consumed, std::function< void(ComputingQuotaOffer const &accumulatedConsumed, ComputingQuotaStats &)> &reportConsumedOffer)
void updateOffers(std::vector< ComputingQuotaOffer > &offers, uint64_t now)
now the time (e.g. uv_now) when invoked.
std::array< ComputingQuotaInfo, MAX_INFLIGHT_OFFERS > mInfos
Information about a given computing offer (e.g. when it was started to be used)
void handleExpired(std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats const &)> reportExpired)
void dispose(int taskId)
Dispose offers for a given taskId.
std::array< ComputingQuotaOffer, MAX_INFLIGHT_OFFERS > mOffers
All the available offerts.
bool selectOffer(int task, ComputingQuotaRequest const &request, uint64_t now, ComputingQuotaOffer *accumulated=nullptr)
std::vector< ComputingQuotaOfferRef > mExpiredOffers
The offers which expired and need to be given back.
GLuint64EXT * result
Definition glcorearb.h:5662
GLint ref
Definition glcorearb.h:291
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::function< void(int id, std::array< ComputingQuotaOffer, 32 > &, ComputingQuotaStats &, std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats &stats)>)> ComputingQuotaConsumer
std::function< OfferScore(ComputingQuotaOffer const &offer, ComputingQuotaOffer const &accumulated)> ComputingQuotaRequest
int cpu
How many cores it can use.
Statistics on the offers consumed, expired.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Running state information of a given device.
Definition DeviceState.h:34