17#include <Monitoring/Monitoring.h>
23#include <fmt/format.h>
24#include <fmt/ranges.h>
70 auto& selected = offers[
ref];
71 auto& info = infos[
ref];
73 if (info.firstUsed == 0) {
82 stats.invalidOffers.clear();
83 stats.otherUser.clear();
84 stats.unexpiring.clear();
85 stats.selectedOffers.clear();
86 stats.expired.clear();
96 O2_SIGNPOST_START(quota, sid,
"summary",
"%zu offers were selected for a total of: cpu %d, memory %lli, shared memory %lli",
97 result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
98 for (
auto& offer :
result) {
105 O2_SIGNPOST_START(quota, sid,
"summary",
"Not enough resources to select offers.");
111 if (stats.invalidOffers.size()) {
112 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers were invalid: %s", fmt::format(
"{}", fmt::join(stats.invalidOffers,
", ")).c_str());
114 if (stats.otherUser.size()) {
115 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers were owned by other users: %s", fmt::format(
"{}", fmt::join(stats.otherUser,
", ")).c_str());
117 if (stats.expired.size()) {
118 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers are expired: %s", fmt::format(
"{}", fmt::join(stats.expired,
", ")).c_str());
120 if (stats.unexpiring.size() > 1) {
121 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers will never expire: %s", fmt::format(
"{}", fmt::join(stats.unexpiring,
", ")).c_str());
129 int64_t minValidity = 0;
141 if (offer.valid ==
false) {
142 stats.invalidOffers.push_back(
i);
145 if (offer.user != -1 && offer.user != task) {
146 stats.otherUser.push_back(
i);
149 if (offer.runtime < 0) {
150 stats.unexpiring.push_back(
i);
151 }
else if (offer.runtime + info.received < now) {
152 O2_SIGNPOST_EVENT_EMIT(quota, qid,
"select",
"Offer %d expired since %llu milliseconds and holds %llu MB",
153 i, now - offer.runtime - info.received, offer.sharedMemory / 1000000);
155 stats.expired.push_back(
i);
158 O2_SIGNPOST_EVENT_EMIT(quota, qid,
"select",
"Offer %d still valid for %llu milliseconds, providing %llu MB",
159 i, offer.runtime + info.received - now, offer.sharedMemory / 1000000);
160 if (minValidity == 0) {
161 minValidity = offer.runtime + info.received - now;
163 minValidity = std::min(minValidity, (int64_t)(offer.runtime + info.received - now));
166 assert(offer.sharedMemory >= 0);
167 auto tmp = accumulated;
168 tmp.
cpu += offer.cpu;
169 tmp.memory += offer.memory;
170 tmp.sharedMemory += offer.sharedMemory;
171 offer.score = selector(offer, tmp);
172 switch (offer.score) {
180 stats.selectedOffers.push_back(
i);
185 stats.selectedOffers.push_back(
i);
191 if (minValidity != 0) {
195 O2_SIGNPOST_EVENT_EMIT(quota, tid,
"select",
"Offer should be expired by now, checking again."); }, minValidity + 100, 0);
198 return summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats);
211 for (
int oi = 0; oi <
mOffers.size(); ++oi) {
213 if (offer.user != taskId) {
223 if (offer.valid ==
false) {
226 if (offer.sharedMemory <= 0) {
228 O2_SIGNPOST_END(quota, oid,
"offers",
"Offer %d back to not needed.", oi);
238 for (
size_t oi = 0; oi <
mOffers.size(); oi++) {
239 auto& storeOffer =
mOffers[oi];
241 if (pending.empty()) {
244 if (storeOffer.valid ==
true) {
248 auto& offer = pending.back();
250 storeOffer.valid =
true;
261 nothingToDoCount = 0;
263 if (nothingToDoCount == 0) {
273 if (offer.sharedMemory < 0) {
274 O2_SIGNPOST_END(quota, oid,
"handleExpired",
"Offer %d does not have any more memory. Marking it as invalid.",
ref.index);
281 O2_SIGNPOST_END(quota, oid,
"handleExpired",
"Offer %d expired. Giving back %llu MB and %d cores",
282 ref.index, offer.sharedMemory / 1000000, offer.cpu);
283 assert(offer.sharedMemory >= 0);
289 offer.sharedMemory = -1;
struct uv_timer_s uv_timer_t
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
void consume(int taskId, ComputingQuotaConsumer &consumed, std::function< void(ComputingQuotaOffer const &accumulatedConsumed, ComputingQuotaStats &)> &reportConsumedOffer)
void updateOffers(std::vector< ComputingQuotaOffer > &offers, uint64_t now)
now the time (e.g. uv_now) when invoked.
ComputingQuotaStats mStats
std::array< ComputingQuotaInfo, MAX_INFLIGHT_OFFERS > mInfos
Information about a given computing offer (e.g. when it was started to be used)
bool selectOffer(int task, ComputingQuotaRequest const &request, uint64_t now)
void handleExpired(std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats const &)> reportExpired)
void dispose(int taskId)
Dispose offers for a given taskId.
ComputingQuotaEvaluator(ServiceRegistryRef ref)
std::array< ComputingQuotaOffer, MAX_INFLIGHT_OFFERS > mOffers
All the available offerts.
std::vector< ComputingQuotaOfferRef > mExpiredOffers
The offers which expired and need to be given back.
Defining PrimaryVertex explicitly as messageable.
std::function< OfferScore(ComputingQuotaOffer const &offer, ComputingQuotaOffer const &accumulated)> ComputingQuotaRequest
std::function< void(int id, std::array< ComputingQuotaOffer, 16 > &, ComputingQuotaStats &, std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats &stats)>)> ComputingQuotaConsumer
int cpu
How many cores it can use.
Statistics on the offers consumed, expired.
int64_t totalExpiredOffers
int64_t totalExpiredBytes
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Running state information of a given device.
std::vector< int > selectedOffers
std::vector< int > unexpiring
std::vector< int > expired
std::vector< int > invalidOffers
std::vector< int > otherUser