27 stats.totalConsumedBytes += accumulatedConsumed.
sharedMemory;
31 std::array<ComputingQuotaOffer, 16>& offers,
36 int64_t bytesSent = bs;
37 for (
size_t oi = 0; oi < offers.size(); oi++) {
38 auto& offer = offers[oi];
39 if (offer.user != taskId) {
42 int64_t toRemove = std::min((int64_t)bytesSent, offer.sharedMemory);
43 offer.sharedMemory -= toRemove;
44 bytesSent -= toRemove;
50 return accountDisposed(disposed, stats);
54 std::array<ComputingQuotaOffer, 16>& offers,
59 int64_t bytesSent = bs;
60 for (
size_t oi = 0; oi < offers.size(); oi++) {
61 auto& offer = offers[oi];
62 if (offer.user != taskId) {
65 int64_t toRemove = std::min((int64_t)bytesSent, offer.sharedMemory);
66 offer.sharedMemory -= toRemove;
67 bytesSent -= toRemove;
73 return accountDisposed(disposed, stats);
81 auto state = std::make_unique<DeviceState>();
82 state->loop = uv_default_loop();
86 std::vector<o2::framework::DataProcessingStats::MetricSpec>
metrics{
87 MetricSpec{.
name =
"resources-missing",
88 .metricId =
static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
91 .minPublishInterval = 1000,
92 .maxRefreshLatency = 1000,
93 .sendInitialValue =
true},
94 MetricSpec{.name =
"resources-insufficient",
95 .metricId =
static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
98 .minPublishInterval = 1000,
99 .maxRefreshLatency = 1000,
100 .sendInitialValue =
true},
101 MetricSpec{.name =
"resources-satisfactory",
102 .metricId =
static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
103 .kind = Kind::UInt64,
105 .minPublishInterval = 1000,
106 .maxRefreshLatency = 1000,
107 .sendInitialValue =
true},
110 stats.registerMetric(metric);
112 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(
state.get()));
113 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
116 std::vector<ComputingQuotaOffer> offers{{.sharedMemory = 1000000}};
117 evaluator.updateOffers(offers, 1);
118 REQUIRE(evaluator.mOffers[1].sharedMemory == 1000000);
119 std::vector<ComputingQuotaOffer> offers2{{.sharedMemory = 1000000}};
120 evaluator.updateOffers(offers2, 2);
121 REQUIRE(evaluator.mOffers[2].sharedMemory == 1000000);
122 std::vector<ComputingQuotaOffer> offers3{{.sharedMemory = 2000000}, {.sharedMemory = 3000000}};
123 evaluator.updateOffers(offers3, 3);
124 REQUIRE(evaluator.mOffers[3].sharedMemory == 3000000);
125 REQUIRE(evaluator.mOffers[4].sharedMemory == 2000000);
127 bool selected = evaluator.selectOffer(1, policy.request, 3);
129 REQUIRE(evaluator.mOffers[0].user == -1);
130 REQUIRE(evaluator.mOffers[1].user == -1);
131 REQUIRE(evaluator.mOffers[2].user == -1);
132 REQUIRE(evaluator.mOffers[3].user == 1);
133 REQUIRE(evaluator.mOffers[4].user == -1);
135 evaluator.consume(1, dispose2MB, reportConsumedOffer);
136 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
137 REQUIRE(evaluator.mOffers[3].user == 1);
142 REQUIRE(evaluator.mOffers[2].sharedMemory == 1000000);
143 evaluator.handleExpired(reportExpiredOffer);
144 REQUIRE(evaluator.mOffers[2].sharedMemory == -1);
145 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
146 evaluator.dispose(1);
147 REQUIRE(evaluator.mOffers[3].user == -1);
148 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
149 REQUIRE(evaluator.mStats.totalExpiredBytes == 2000000);
150 REQUIRE(evaluator.mStats.totalConsumedBytes == 2000000);
152 selected = evaluator.selectOffer(1, policy.request, 3);
155 REQUIRE(evaluator.mOffers[0].user == -1);
156 REQUIRE(evaluator.mOffers[1].user == -1);
157 REQUIRE(evaluator.mOffers[2].user == -1);
158 REQUIRE(evaluator.mOffers[3].user == 1);
159 REQUIRE(evaluator.mOffers[3].valid ==
true);
160 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
161 REQUIRE(evaluator.mOffers[4].user == 1);
162 REQUIRE(evaluator.mOffers[4].sharedMemory == 2000000);
164 evaluator.consume(1, dispose2MB, reportConsumedOffer);
165 REQUIRE(evaluator.mOffers[2].sharedMemory == -1);
166 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
167 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
168 evaluator.handleExpired(reportExpiredOffer);
169 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
170 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
171 evaluator.dispose(1);
172 REQUIRE(evaluator.mOffers[3].user == -1);
173 REQUIRE(evaluator.mOffers[4].user == -1);
174 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
175 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
176 REQUIRE(evaluator.mStats.totalExpiredBytes == 2000000);
177 REQUIRE(evaluator.mStats.totalConsumedBytes == 4000000);
179 std::vector<ComputingQuotaOffer> offers4{{.sharedMemory = 1000000, .runtime = 100}};
180 evaluator.updateOffers(offers4, 2);
181 REQUIRE(evaluator.mOffers[1].sharedMemory == 1000000);
182 REQUIRE(evaluator.mOffers[2].sharedMemory == -1);
183 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
184 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
186 selected = evaluator.selectOffer(1, policy.request, 10);
187 evaluator.handleExpired(reportExpiredOffer);
188 selected = evaluator.selectOffer(1, policy.request, 11);
189 evaluator.handleExpired(reportExpiredOffer);
190 std::vector<ComputingQuotaOffer> offers5{{.sharedMemory = 1000000, .runtime = 100}};
191 evaluator.updateOffers(offers5, 4);
192 selected = evaluator.selectOffer(1, policy.request, 13);
193 evaluator.consume(1, dispose2MB, reportConsumedOffer);
194 evaluator.handleExpired(reportExpiredOffer);
195 evaluator.dispose(1);
196 REQUIRE(evaluator.mStats.totalExpiredBytes == 3000000);
197 REQUIRE(evaluator.mStats.totalConsumedBytes == 6000000);
199 REQUIRE(evaluator.mOffers[1].sharedMemory == 0);
200 REQUIRE(evaluator.mOffers[2].sharedMemory == 0);
201 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
202 REQUIRE(evaluator.mOffers[4].sharedMemory == -1);
204 std::vector<ComputingQuotaOffer> offers6{{.sharedMemory = 2000000, .runtime = 100}, {.sharedMemory = 1000000, .runtime = 100}};
205 evaluator.updateOffers(offers6, 19);
206 REQUIRE(evaluator.mOffers[1].sharedMemory == 1000000);
207 REQUIRE(evaluator.mOffers[2].sharedMemory == 2000000);
209 selected = evaluator.selectOffer(1, policy.request, 20);
210 evaluator.consume(1, dispose10MB, reportConsumedOffer);
211 evaluator.handleExpired(reportExpiredOffer);
212 evaluator.dispose(1);
213 REQUIRE(evaluator.mOffers[1].sharedMemory == 0);
214 REQUIRE(evaluator.mOffers[2].sharedMemory == 0);
215 REQUIRE(evaluator.mOffers[1].user == -1);
216 REQUIRE(evaluator.mOffers[2].user == -1);
217 REQUIRE(evaluator.mOffers[1].valid ==
false);
218 REQUIRE(evaluator.mOffers[2].valid ==
false);