28 std::function<int64_t(int64_t base, int64_t
offset)> getTimestamp_)
29 : getTimestamp(getTimestamp_),
30 getRealtimeBase(getRealtimeBase_)
58 update.timestamp = timestamp;
66 update.timestamp = timestamp;
71 int newCapacity = std::max(
size, 64);
80 update.timestamp = timestamp;
96 LOGP(
debug,
"Updating state {} with {}", cmd.
id, std::string_view(cmd.
data, cmd.
size));
115 throw runtime_error_f(
"State size is %d for state %s. States larger than 16384 bytes not supported for now.",
118 int idx =
nextState.fetch_sub(
size, std::memory_order_relaxed);
119 if (idx -
size < 0) {
127 std::this_thread::sleep_for(std::chrono::milliseconds(1));
134 }
else if (idx < 0) {
137 std::this_thread::sleep_for(std::chrono::milliseconds(1));
138 idx =
nextState.load(std::memory_order_relaxed);
170 if (
updated[mi] ==
false && (currentTimestamp - update.timestamp) > spec.maxRefreshLatency) {
172 update.timestamp = currentTimestamp;
177 int64_t delayPublished = (currentTimestamp - update.lastPublished);
178 assert(delayPublished >= 0);
179 if (delayPublished < spec.minPublishInterval) {
185 auto msg = std::string_view(
statesBuffer.data() + view.first, view.size);
187 callback(spec.name.data(), update.timestamp,
msg);
189 update.lastPublished = currentTimestamp;
195 static int64_t startTime = uv_hrtime();
196 int64_t now = uv_hrtime();
200 LOGP(
debug,
"Publishing invoked {} times / s, {} metrics published / s", (
int)averageInvocations, (
int)averagePublishing);
207 std::array<int, MAX_STATES> order;
208 std::iota(order.begin(), order.end(), 0);
209 std::stable_sort(order.begin(), order.begin() +
statesViews.size(), [&](
int a,
int b) {
210 return statesViews[a].first < statesViews[b].first;
213 for (
size_t i = 0;
i < order.size(); ++
i) {
216 if (view.size == 0) {
220 if (view.first == position) {
224 view.first = position;
225 view.capacity = view.size;
226 position += view.size;
236 auto currentMetric = std::find_if(
stateSpecs.begin(),
stateSpecs.end(), [&spec](
StateSpec const& s) { return s.name == spec.name && s.stateId != spec.stateId; });
238 throw runtime_error_f(
"Metric %s already registered with id %d. Cannot reregister with %d.", spec.
name.data(), currentMetric->stateId, spec.
stateId);
GLenum GLenum GLsizei const GLuint GLboolean enabled
GLboolean GLboolean GLboolean b
GLboolean GLboolean GLboolean GLboolean a
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error_f(const char *,...)
std::array< StateView, MAX_STATES > statesViews
The views for all the states, indexed by the state id.
static constexpr int STATES_BUFFER_SIZE
int64_t pushedMetricsLapse
int64_t publishingInvokedTotal
void processCommandQueue()
std::vector< short > registeredStates
std::array< StateSpec, MAX_STATES > stateSpecs
std::array< std::string, MAX_STATES > stateNames
int64_t publishedMetricsLapse
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
std::atomic< int > nextState
void updateState(CommandSpec state)
std::atomic< int64_t > updatedMetricsLapse
std::array< char, STATES_BUFFER_SIZE > store
std::atomic< int > pendingStates
void registerState(StateSpec const &spec)
std::array< bool, MAX_STATES > updated
DataProcessingStates(std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase, std::function< int64_t(int64_t base, int64_t offset)> getTimestamp)
int64_t initialTimeOffset
std::atomic< int > lastInsertedState
std::atomic< int64_t > generation
void flushChangedStates(std::function< void(std::string const &, int64_t, std::string_view)> const &callback)
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
int64_t publishingDoneTotal
std::vector< char > statesBuffer
The buffer were we store the state before flushing it.
std::array< UpdateInfo, MAX_STATES > updateInfos
uint64_t const void const *restrict const msg