Project
Loading...
Searching...
No Matches
test_ComputingQuotaEvaluator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
16#include "Framework/Logger.h"
20#include "uv.h"
21
22using namespace o2::framework;
23
24TEST_CASE("TestComputingQuotaEvaluator")
25{
26 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> reportConsumedOffer = [](ComputingQuotaOffer const& accumulatedConsumed, ComputingQuotaStats& stats) {
27 stats.totalConsumedBytes += accumulatedConsumed.sharedMemory;
28 };
29
30 ComputingQuotaConsumer dispose2MB = [bs = 2000000](int taskId,
31 std::array<ComputingQuotaOffer, 16>& offers,
33 std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> accountDisposed) {
34 ComputingQuotaOffer disposed;
35 disposed.sharedMemory = 0;
36 int64_t bytesSent = bs;
37 for (size_t oi = 0; oi < offers.size(); oi++) {
38 auto& offer = offers[oi];
39 if (offer.user != taskId) {
40 continue;
41 }
42 int64_t toRemove = std::min((int64_t)bytesSent, offer.sharedMemory);
43 offer.sharedMemory -= toRemove;
44 bytesSent -= toRemove;
45 disposed.sharedMemory += toRemove;
46 if (bytesSent <= 0) {
47 break;
48 }
49 }
50 return accountDisposed(disposed, stats);
51 };
52
53 ComputingQuotaConsumer dispose10MB = [bs = 10000000](int taskId,
54 std::array<ComputingQuotaOffer, 16>& offers,
56 std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats&)> accountDisposed) {
57 ComputingQuotaOffer disposed;
58 disposed.sharedMemory = 0;
59 int64_t bytesSent = bs;
60 for (size_t oi = 0; oi < offers.size(); oi++) {
61 auto& offer = offers[oi];
62 if (offer.user != taskId) {
63 continue;
64 }
65 int64_t toRemove = std::min((int64_t)bytesSent, offer.sharedMemory);
66 offer.sharedMemory -= toRemove;
67 bytesSent -= toRemove;
68 disposed.sharedMemory += toRemove;
69 if (bytesSent <= 0) {
70 break;
71 }
72 }
73 return accountDisposed(disposed, stats);
74 };
75
77 TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
78
79 ServiceRegistry registry;
80 ServiceRegistryRef ref(registry);
81 auto state = std::make_unique<DeviceState>();
82 state->loop = uv_default_loop();
83 using MetricSpec = DataProcessingStats::MetricSpec;
84 using Kind = DataProcessingStats::Kind;
85 using Scope = DataProcessingStats::Scope;
86 std::vector<o2::framework::DataProcessingStats::MetricSpec> metrics{
87 MetricSpec{.name = "resources-missing",
88 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_MISSING),
89 .kind = Kind::UInt64,
90 .scope = Scope::DPL,
91 .minPublishInterval = 1000,
92 .maxRefreshLatency = 1000,
93 .sendInitialValue = true},
94 MetricSpec{.name = "resources-insufficient",
95 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_INSUFFICIENT),
96 .kind = Kind::UInt64,
97 .scope = Scope::DPL,
98 .minPublishInterval = 1000,
99 .maxRefreshLatency = 1000,
100 .sendInitialValue = true},
101 MetricSpec{.name = "resources-satisfactory",
102 .metricId = static_cast<short>(ProcessingStatsId::RESOURCES_SATISFACTORY),
103 .kind = Kind::UInt64,
104 .scope = Scope::DPL,
105 .minPublishInterval = 1000,
106 .maxRefreshLatency = 1000,
107 .sendInitialValue = true},
108 };
109 for (auto& metric : metrics) {
110 stats.registerMetric(metric);
111 }
112 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(state.get()));
113 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
114
115 ComputingQuotaEvaluator evaluator{ref};
116 std::vector<ComputingQuotaOffer> offers{{.sharedMemory = 1000000}};
117 evaluator.updateOffers(offers, 1);
118 REQUIRE(evaluator.mOffers[1].sharedMemory == 1000000);
119 std::vector<ComputingQuotaOffer> offers2{{.sharedMemory = 1000000}};
120 evaluator.updateOffers(offers2, 2);
121 REQUIRE(evaluator.mOffers[2].sharedMemory == 1000000);
122 std::vector<ComputingQuotaOffer> offers3{{.sharedMemory = 2000000}, {.sharedMemory = 3000000}};
123 evaluator.updateOffers(offers3, 3);
124 REQUIRE(evaluator.mOffers[3].sharedMemory == 3000000);
125 REQUIRE(evaluator.mOffers[4].sharedMemory == 2000000);
126 auto policy = ResourcePolicyHelpers::sharedMemoryBoundTask("internal-dpl-aod-reader.*", 2000000);
127 bool selected = evaluator.selectOffer(1, policy.request, 3);
128 REQUIRE(selected);
129 REQUIRE(evaluator.mOffers[0].user == -1);
130 REQUIRE(evaluator.mOffers[1].user == -1);
131 REQUIRE(evaluator.mOffers[2].user == -1);
132 REQUIRE(evaluator.mOffers[3].user == 1);
133 REQUIRE(evaluator.mOffers[4].user == -1);
134
135 evaluator.consume(1, dispose2MB, reportConsumedOffer);
136 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
137 REQUIRE(evaluator.mOffers[3].user == 1);
138
139 static std::function<void(ComputingQuotaOffer const&, ComputingQuotaStats const&)> reportExpiredOffer = [](ComputingQuotaOffer const& offer, ComputingQuotaStats const& stats) {
140 };
141
142 REQUIRE(evaluator.mOffers[2].sharedMemory == 1000000);
143 evaluator.handleExpired(reportExpiredOffer);
144 REQUIRE(evaluator.mOffers[2].sharedMemory == -1);
145 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
146 evaluator.dispose(1);
147 REQUIRE(evaluator.mOffers[3].user == -1);
148 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
149 REQUIRE(evaluator.mStats.totalExpiredBytes == 2000000);
150 REQUIRE(evaluator.mStats.totalConsumedBytes == 2000000);
151
152 selected = evaluator.selectOffer(1, policy.request, 3);
153 REQUIRE(selected);
154
155 REQUIRE(evaluator.mOffers[0].user == -1);
156 REQUIRE(evaluator.mOffers[1].user == -1);
157 REQUIRE(evaluator.mOffers[2].user == -1);
158 REQUIRE(evaluator.mOffers[3].user == 1);
159 REQUIRE(evaluator.mOffers[3].valid == true);
160 REQUIRE(evaluator.mOffers[3].sharedMemory == 1000000);
161 REQUIRE(evaluator.mOffers[4].user == 1);
162 REQUIRE(evaluator.mOffers[4].sharedMemory == 2000000);
163
164 evaluator.consume(1, dispose2MB, reportConsumedOffer);
165 REQUIRE(evaluator.mOffers[2].sharedMemory == -1);
166 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
167 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
168 evaluator.handleExpired(reportExpiredOffer);
169 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
170 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
171 evaluator.dispose(1);
172 REQUIRE(evaluator.mOffers[3].user == -1);
173 REQUIRE(evaluator.mOffers[4].user == -1);
174 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
175 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
176 REQUIRE(evaluator.mStats.totalExpiredBytes == 2000000);
177 REQUIRE(evaluator.mStats.totalConsumedBytes == 4000000);
178
179 std::vector<ComputingQuotaOffer> offers4{{.sharedMemory = 1000000, .runtime = 100}};
180 evaluator.updateOffers(offers4, 2);
181 REQUIRE(evaluator.mOffers[1].sharedMemory == 1000000);
182 REQUIRE(evaluator.mOffers[2].sharedMemory == -1);
183 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
184 REQUIRE(evaluator.mOffers[4].sharedMemory == 1000000);
185
186 selected = evaluator.selectOffer(1, policy.request, 10);
187 evaluator.handleExpired(reportExpiredOffer);
188 selected = evaluator.selectOffer(1, policy.request, 11);
189 evaluator.handleExpired(reportExpiredOffer);
190 std::vector<ComputingQuotaOffer> offers5{{.sharedMemory = 1000000, .runtime = 100}};
191 evaluator.updateOffers(offers5, 4);
192 selected = evaluator.selectOffer(1, policy.request, 13);
193 evaluator.consume(1, dispose2MB, reportConsumedOffer);
194 evaluator.handleExpired(reportExpiredOffer);
195 evaluator.dispose(1);
196 REQUIRE(evaluator.mStats.totalExpiredBytes == 3000000);
197 REQUIRE(evaluator.mStats.totalConsumedBytes == 6000000);
198
199 REQUIRE(evaluator.mOffers[1].sharedMemory == 0);
200 REQUIRE(evaluator.mOffers[2].sharedMemory == 0);
201 REQUIRE(evaluator.mOffers[3].sharedMemory == 0);
202 REQUIRE(evaluator.mOffers[4].sharedMemory == -1);
203
204 std::vector<ComputingQuotaOffer> offers6{{.sharedMemory = 2000000, .runtime = 100}, {.sharedMemory = 1000000, .runtime = 100}};
205 evaluator.updateOffers(offers6, 19);
206 REQUIRE(evaluator.mOffers[1].sharedMemory == 1000000);
207 REQUIRE(evaluator.mOffers[2].sharedMemory == 2000000);
209 selected = evaluator.selectOffer(1, policy.request, 20);
210 evaluator.consume(1, dispose10MB, reportConsumedOffer);
211 evaluator.handleExpired(reportExpiredOffer);
212 evaluator.dispose(1);
213 REQUIRE(evaluator.mOffers[1].sharedMemory == 0);
214 REQUIRE(evaluator.mOffers[2].sharedMemory == 0);
215 REQUIRE(evaluator.mOffers[1].user == -1);
216 REQUIRE(evaluator.mOffers[2].user == -1);
217 REQUIRE(evaluator.mOffers[1].valid == false);
218 REQUIRE(evaluator.mOffers[2].valid == false);
219}
benchmark::State & state
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::function< void(int id, std::array< ComputingQuotaOffer, 16 > &, ComputingQuotaStats &, std::function< void(ComputingQuotaOffer const &, ComputingQuotaStats &stats)>)> ComputingQuotaConsumer
int64_t sharedMemory
How much shared memory it can allocate.
Statistics on the offers consumed, expired.
Helper struct to hold statistics about the data processing happening.
static ResourcePolicy sharedMemoryBoundTask(char const *taskMatcher, int maxMemory)
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
TEST_CASE("TestComputingQuotaEvaluator")