70 auto& selected = offers[
ref];
71 auto& info = infos[
ref];
73 if (info.firstUsed == 0) {
82 stats.invalidOffers.clear();
83 stats.otherUser.clear();
84 stats.unexpiring.clear();
85 stats.selectedOffers.clear();
86 stats.expired.clear();
96 O2_SIGNPOST_START(quota, sid,
"summary",
"%zu offers were selected for a total of: cpu %d, memory %lli, shared memory %lli",
97 result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
98 for (
auto& offer :
result) {
106 O2_SIGNPOST_START(quota, sid,
"summary",
"Not enough resources: accumulated %zu partial offers providing cpu=%d, memory=%lld MB, shared memory=%lld MB, timeslices=%lld, but still insufficient.",
107 result.size(), totalOffer.cpu, totalOffer.memory / 1000000, totalOffer.sharedMemory / 1000000, totalOffer.timeslices);
110 O2_SIGNPOST_START(quota, sid,
"summary",
"Not enough resources: no suitable offers found (all offers were invalid, expired, or owned by other tasks).");
114 if (stats.invalidOffers.size()) {
115 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers were invalid: %s", fmt::format(
"{}", fmt::join(stats.invalidOffers,
", ")).c_str());
117 if (stats.otherUser.size()) {
118 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers were owned by other users: %s", fmt::format(
"{}", fmt::join(stats.otherUser,
", ")).c_str());
120 if (stats.expired.size()) {
121 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers are expired: %s", fmt::format(
"{}", fmt::join(stats.expired,
", ")).c_str());
123 if (stats.unexpiring.size() > 1) {
124 O2_SIGNPOST_EVENT_EMIT(quota, sid,
"summary",
"The following offers will never expire: %s", fmt::format(
"{}", fmt::join(stats.unexpiring,
", ")).c_str());
145 if (offer.valid ==
false) {
147 stats.invalidOffers.push_back(
i);
150 if (offer.user != -1 && offer.user != task) {
152 stats.otherUser.push_back(
i);
155 if (offer.runtime < 0) {
156 stats.unexpiring.push_back(
i);
157 }
else if (offer.runtime + info.received < now) {
158 O2_SIGNPOST_EVENT_EMIT(quota, qid,
"select",
"Offer %d expired since %llu milliseconds and holds %llu MB and %llu timeslices",
159 i, now - offer.runtime - info.received, offer.sharedMemory / 1000000, offer.timeslices);
161 stats.expired.push_back(
i);
164 O2_SIGNPOST_EVENT_EMIT(quota, qid,
"select",
"Offer %d still valid for %llu milliseconds, providing %llu MB and %llu timeslices",
165 i, offer.runtime + info.received - now, offer.sharedMemory / 1000000, offer.timeslices);
166 if (minValidity == 0) {
167 minValidity = offer.runtime + info.received - now;
169 minValidity = std::min(minValidity, (
int64_t)(offer.runtime + info.received - now));
172 assert(offer.sharedMemory >= 0);
173 auto tmp = accumulated;
174 tmp.
cpu += offer.cpu;
175 tmp.memory += offer.memory;
176 tmp.sharedMemory += offer.sharedMemory;
177 tmp.timeslices += offer.timeslices;
178 offer.score = selector(offer, accumulated);
179 switch (offer.score) {
189 stats.selectedOffers.push_back(
i);
190 O2_SIGNPOST_EVENT_EMIT(quota, qid,
"select",
"Offer %d selected but not enough. %llu MB, %d cores and %llu timeslices are not enough.",
191 i, tmp.sharedMemory / 1000000, tmp.cpu, tmp.timeslices);
196 stats.selectedOffers.push_back(
i);
198 O2_SIGNPOST_EVENT_EMIT(quota, qid,
"select",
"Selected %zu offers providing %llu MB, %d cores and %llu timeslices are deemed enough.",
199 stats.selectedOffers.size(), tmp.sharedMemory / 1000000, tmp.cpu, tmp.timeslices);
204 if (minValidity != 0) {
208 O2_SIGNPOST_EVENT_EMIT(quota, tid,
"select",
"Offer should be expired by now, checking again."); }, minValidity + 100, 0);
211 bool result = summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats);
212 if (outAccumulated) {
213 *outAccumulated = accumulated;
256 O2_SIGNPOST_START(quota, oid,
"updateOffers",
"Starting to process %zu received offers", pending.size());
258 for (
size_t oi = 0; oi <
mOffers.size(); oi++) {
259 auto& storeOffer =
mOffers[oi];
261 if (pending.empty()) {
262 O2_SIGNPOST_END(quota, oid,
"updateOffers",
"No more pending offers to process");
265 if (storeOffer.valid ==
true) {
266 O2_SIGNPOST_EVENT_EMIT(quota, oid,
"updateOffers",
"Skipping update of offer %zu because it's still valid", oi);
273 auto& offer = pending.back();
274 O2_SIGNPOST_EVENT_EMIT(quota, oid,
"updateOffers",
"Updating of offer %zu at %llu. Cpu: %d, Shared Memory %lli, Timeslices: %lli",
275 oi, now, offer.cpu, offer.sharedMemory, offer.timeslices);
277 storeOffer.valid =
true;
280 if (lastValid == -1) {
284 auto& lastValidOffer =
mOffers[lastValid];
285 for (
auto& stillPending : pending) {
286 lastValidOffer.cpu += stillPending.cpu;
287 lastValidOffer.memory += stillPending.memory;
288 lastValidOffer.sharedMemory += stillPending.sharedMemory;
289 lastValidOffer.timeslices += stillPending.timeslices;
290 lastValidOffer.runtime = std::max(lastValidOffer.runtime, stillPending.runtime);
293 auto& updatedOffer =
mOffers[lastValid];
294 O2_SIGNPOST_END(quota, oid,
"updateOffers",
"Remaining offers cohalesced to %d. New values: Cpu%d, Shared Memory %lli, Timeslices %lli",
295 lastValid, updatedOffer.cpu, updatedOffer.sharedMemory, updatedOffer.timeslices);
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)