Project
Loading...
Searching...
No Matches
ComputingQuotaEvaluator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
18#include "Framework/Logger.h"
19#include <Monitoring/Monitoring.h>
20
21#include <vector>
22#include <uv.h>
23#include <cassert>
24
25#define LOGLEVEL debug
26
27
28namespace o2::framework
29{
30
32 : mRef(ref)
33{
34 auto& state = mRef.get<DeviceState>();
35 // The first offer is valid, but does not contain any resource
36 // so this will only work with some device which does not require
37 // any CPU. Notice this will have troubles if a given DPL process
38 // runs for more than a year.
39 mOffers[0] = {
40 0,
41 0,
42 0,
43 -1,
44 -1,
46 true};
47 mInfos[0] = {
48 uv_now(state.loop),
49 0,
50 0};
51
52 // Creating a timer to check for expired offers
53 mTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
54 uv_timer_init(state.loop, mTimer);
55}
56
58 std::vector<int> invalidOffers;
59 std::vector<int> otherUser;
60 std::vector<int> unexpiring;
61 std::vector<int> selectedOffers;
62 std::vector<int> expired;
63};
64
65bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector, uint64_t now)
66{
67 auto selectOffer = [&offers = this->mOffers, &infos = this->mInfos, task](int ref, uint64_t now) {
68 auto& selected = offers[ref];
69 auto& info = infos[ref];
70 selected.user = task;
71 if (info.firstUsed == 0) {
72 info.firstUsed = now;
73 }
74 info.lastUsed = now;
75 };
76
77 ComputingQuotaOffer accumulated;
78 static QuotaEvaluatorStats stats;
79
80 stats.invalidOffers.clear();
81 stats.otherUser.clear();
82 stats.unexpiring.clear();
83 stats.selectedOffers.clear();
84 stats.expired.clear();
85
86 auto summarizeWhatHappended = [ref = mRef](bool enough, std::vector<int> const& result, ComputingQuotaOffer const& totalOffer, QuotaEvaluatorStats& stats) -> bool {
87 auto& dpStats = ref.get<DataProcessingStats>();
88 if (result.size() == 1 && result[0] == 0) {
89 // LOG(LOGLEVEL) << "No particular resource was requested, so we schedule task anyways";
90 return enough;
91 }
92 if (enough) {
93 LOGP(LOGLEVEL, "{} offers were selected for a total of: cpu {}, memory {}, shared memory {}", result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
94 //LOG(LOGLEVEL) << " The following offers were selected for computation: {} " << fmt::join(result, ", ");
95 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1});
96 } else {
97 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_MISSING), DataProcessingStats::Op::Add, 1});
98 if (result.size()) {
99 dpStats.updateStats({static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT), DataProcessingStats::Op::Add, 1});
100 }
101 }
102 if (stats.invalidOffers.size()) {
103 // LOGP(LOGLEVEL, " The following offers were invalid: {}", fmt::join(stats.invalidOffers, ", "));
104 }
105 if (stats.otherUser.size()) {
106 // LOGP(LOGLEVEL, " The following offers were owned by other users: {}", fmt::join(stats.otherUser, ", "));
107 }
108 if (stats.expired.size()) {
109 // LOGP(LOGLEVEL, " The following offers are expired: {}", fmt::join(stats.expired, ", "));
110 }
111 if (stats.unexpiring.size() > 1) {
112 // LOGP(LOGLEVEL, " The following offers will never expire: {}", fmt::join(stats.unexpiring, ", "));
113 }
114
115 return enough;
116 };
117
118 bool enough = false;
119 int64_t minValidity = 0;
120
121 for (int i = 0; i != mOffers.size(); ++i) {
122 auto& offer = mOffers[i];
123 auto& info = mInfos[i];
124 if (enough) {
125 break;
126 }
127 // Ignore:
128 // - Invalid offers
129 // - Offers which belong to another task
130 // - Expired offers
131 if (offer.valid == false) {
132 stats.invalidOffers.push_back(i);
133 continue;
134 }
135 if (offer.user != -1 && offer.user != task) {
136 stats.otherUser.push_back(i);
137 continue;
138 }
139 if (offer.runtime < 0) {
140 stats.unexpiring.push_back(i);
141 } else if (offer.runtime + info.received < now) {
142 LOGP(LOGLEVEL, "Offer {} expired since {} milliseconds and holds {}MB", i, now - offer.runtime - info.received, offer.sharedMemory / 1000000);
144 stats.expired.push_back(i);
145 continue;
146 } else {
147 LOGP(LOGLEVEL, "Offer {} still valid for {} milliseconds, providing {}MB", i, offer.runtime + info.received - now, offer.sharedMemory / 1000000);
148 if (minValidity == 0) {
149 minValidity = offer.runtime + info.received - now;
150 }
151 minValidity = std::min(minValidity,(int64_t)(offer.runtime + info.received - now));
152 }
154 assert(offer.sharedMemory >= 0);
155 auto tmp = accumulated;
156 tmp.cpu += offer.cpu;
157 tmp.memory += offer.memory;
158 tmp.sharedMemory += offer.sharedMemory;
159 offer.score = selector(offer, tmp);
160 switch (offer.score) {
162 continue;
164 continue;
165 case OfferScore::More:
166 selectOffer(i, now);
167 accumulated = tmp;
168 stats.selectedOffers.push_back(i);
169 continue;
171 selectOffer(i, now);
172 accumulated = tmp;
173 stats.selectedOffers.push_back(i);
174 enough = true;
175 break;
176 };
177 }
178
179 if (minValidity != 0) {
180 LOGP(LOGLEVEL, "Next offer to expire in {} milliseconds", minValidity);
181 uv_timer_start(mTimer, [](uv_timer_t* handle) {
182 LOGP(LOGLEVEL, "Offer should be expired by now, checking again");
183 },
184 minValidity + 100, 0);
185 }
186 // If we get here it means we never got enough offers, so we return false.
187 return summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats);
188}
189
190void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer, std::function<void(ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& reportConsumedOffer)>& reportConsumedOffer)
191{
192 // This will report how much of the offers has to be considered consumed.
193 // Notice that actual memory usage might be larger, because we can over
194 // allocate.
195 consumer(id, mOffers, mStats, reportConsumedOffer);
196}
197
199{
200 for (int oi = 0; oi < mOffers.size(); ++oi) {
201 auto& offer = mOffers[oi];
202 if (offer.user != taskId) {
203 continue;
204 }
205 offer.user = -1;
206 // Disposing the offer so that the resource can be recyled.
209 if (oi == 0) {
210 return;
211 }
212 if (offer.valid == false) {
213 continue;
214 }
215 if (offer.sharedMemory <= 0) {
216 offer.valid = false;
217 offer.score = OfferScore::Unneeded;
218 }
219 }
220}
221
223void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pending, uint64_t now)
224{
225 for (size_t oi = 0; oi < mOffers.size(); oi++) {
226 auto& storeOffer = mOffers[oi];
227 auto& info = mInfos[oi];
228 if (pending.empty()) {
229 return;
230 }
231 if (storeOffer.valid == true) {
232 continue;
233 }
234 info.received = now;
235 auto& offer = pending.back();
236 storeOffer = offer;
237 storeOffer.valid = true;
238 pending.pop_back();
239 }
240}
241
242void ComputingQuotaEvaluator::handleExpired(std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const& stats)> expirator)
243{
244 static int nothingToDoCount = mExpiredOffers.size();
245 if (mExpiredOffers.size()) {
246 LOGP(LOGLEVEL, "Handling {} expired offers", mExpiredOffers.size());
247 nothingToDoCount = 0;
248 } else {
249 if (nothingToDoCount == 0) {
250 nothingToDoCount++;
251 LOGP(LOGLEVEL, "No expired offers");
252 }
253 }
256 for (auto& ref : mExpiredOffers) {
257 auto& offer = mOffers[ref.index];
258 if (offer.sharedMemory < 0) {
259 LOGP(LOGLEVEL, "Offer {} does not have any more memory. Marking it as invalid.", ref.index);
260 offer.valid = false;
261 offer.score = OfferScore::Unneeded;
262 continue;
263 }
264 // FIXME: offers should go through the driver client, not the monitoring
265 // api.
266 LOGP(LOGLEVEL, "Offer {} expired. Giving back {}MB and {} cores", ref.index, offer.sharedMemory / 1000000, offer.cpu);
267 assert(offer.sharedMemory >= 0);
268 mStats.totalExpiredBytes += offer.sharedMemory;
270 expirator(offer, mStats);
271 //driverClient.tell("expired shmem {}", offer.sharedMemory);
272 //driverClient.tell("expired cpu {}", offer.cpu);
273 offer.sharedMemory = -1;
274 offer.valid = false;
275 offer.score = OfferScore::Unneeded;
276 }
277 mExpiredOffers.clear();
278}
279
280} // namespace o2::framework
benchmark::State & state
struct uv_timer_s uv_timer_t
#define LOGLEVEL
int32_t i
void consume(int taskId, ComputingQuotaConsumer &consumed, std::function< void(ComputingQuotaOffer const &accumulatedConsumed, ComputingQuotaStats &)> &reportConsumedOffer)
void updateOffers(std::vector< ComputingQuotaOffer > &offers, uint64_t now)
now the time (e.g. uv_now) when invoked.
std::array< ComputingQuotaInfo, MAX_INFLIGHT_OFFERS > mInfos
Information about a given computing offer (e.g. when it was started to be used)
bool selectOffer(int task, ComputingQuotaRequest const &request, uint64_t now)
void handleExpired(std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats const &)> reportExpired)
void dispose(int taskId)
Dispose offers for a given taskId.
std::array< ComputingQuotaOffer, MAX_INFLIGHT_OFFERS > mOffers
All the available offerts.
std::vector< ComputingQuotaOfferRef > mExpiredOffers
The offers which expired and need to be given back.
GLuint64EXT * result
Definition glcorearb.h:5662
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::function< OfferScore(ComputingQuotaOffer const &offer, ComputingQuotaOffer const &accumulated)> ComputingQuotaRequest
std::function< void(int id, std::array< ComputingQuotaOffer, 16 > &, ComputingQuotaStats &, std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats &stats)>)> ComputingQuotaConsumer
int cpu
How many cores it can use.
Statistics on the offers consumed, expired.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Running state information of a given device.
Definition DeviceState.h:34