23 std::function<int64_t(int64_t base, int64_t
offset)> getTimestamp_,
25 : getTimestamp(getTimestamp_),
26 getRealtimeBase(getRealtimeBase_),
38 LOGP(
debug,
"MetricID {} is disabled", (
int)cmd.
id);
50 auto idx =
nextCmd.fetch_add(1, std::memory_order_relaxed);
51 if (idx ==
cmds.size()) {
54 while (
pendingCmds.load(std::memory_order_relaxed) > 0) {
59 std::this_thread::sleep_for(std::chrono::milliseconds(1));
63 nextCmd.store(0, std::memory_order_relaxed);
65 idx =
nextCmd.fetch_add(1, std::memory_order_relaxed);
66 }
else if (idx >
cmds.size()) {
69 std::this_thread::sleep_for(std::chrono::milliseconds(1));
70 idx =
nextCmd.load(std::memory_order_relaxed);
75 assert(idx <
cmds.size());
86 std::array<char, MAX_CMDS> order;
93 std::iota(order.begin(), order.begin() +
range, 0);
97 std::stable_sort(order.begin(), order.begin() +
range, [
this](
char a,
char b) {
98 return cmds[a].timestamp < cmds[b].timestamp;
110 if (cmd.value !=
metrics[cmd.id] && cmd.timestamp >= update.timestamp) {
113 update.timestamp = cmd.timestamp;
121 update.timestamp = cmd.timestamp;
129 update.timestamp = cmd.timestamp;
134 if (cmd.value >
metrics[cmd.id]) {
137 update.timestamp = cmd.timestamp;
142 if (cmd.value <
metrics[cmd.id]) {
145 update.timestamp = cmd.timestamp;
150 if (cmd.value > 0 && cmd.timestamp >= update.timestamp) {
153 update.timestamp = cmd.timestamp;
166 if (update.timestamp == 0) {
167 update.timestamp = cmd.timestamp;
180 if (update.timestamp == 0) {
181 update.timestamp = cmd.timestamp;
189 nextCmd.store(0, std::memory_order_relaxed);
202 if (spec.
name.empty()) {
206 LOGP(
debug,
"Metric {} is disabled", spec.
name);
211 update.timestamp = currentTimestamp;
221 LOGP(fatal,
"Metric {} has unknown kind", spec.
name);
224 if (currentTimestamp - update.timestamp == 0) {
225 callback(spec, update.timestamp, 0);
230 update.timestamp = currentTimestamp;
232 callback(spec, update.timestamp,
metrics[mi]);
236 update.lastPublished = currentTimestamp;
242 static int64_t startTime = uv_hrtime();
243 int64_t now = uv_hrtime();
245 auto timeDelta = std::max(int64_t(1), now - startTime);
249 LOGP(
debug,
"Publishing invoked {} times / s, {} metrics published / s", (
int)averageInvocations, (
int)averagePublishing);
254 if (spec.
name.size() == 0) {
264 auto currentMetric = std::find_if(
metricSpecs.begin(),
metricSpecs.end(), [&spec](
MetricSpec const& s) { return s.name == spec.name && s.metricId != spec.metricId; });
266 throw runtime_error_f(
"Metric %s already registered with id %d. Cannot reregister with %d.", spec.
name.data(), currentMetric->metricId, spec.
metricId);
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLboolean GLboolean GLboolean b
GLboolean GLboolean GLboolean GLboolean a
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
int64_t minOnlinePublishInterval
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
Kind kind
The kind of the metric.
uint64_t maxRefreshLatency
std::atomic< int > pendingCmds
std::array< bool, MAX_METRICS > updated
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::vector< int > availableMetrics
int64_t publishingInvokedTotal
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
std::atomic< int64_t > updatedMetricsLapse
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
void updateStats(CommandSpec cmd)
int64_t initialTimeOffset
std::array< std::string, MAX_METRICS > metricsNames
int64_t publishedMetricsLapse
std::array< Command, MAX_CMDS > cmds
std::atomic< int > nextCmd
std::atomic< int > insertedCmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
DataProcessingStats(std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase, std::function< int64_t(int64_t base, int64_t offset)> getTimestamp, DefaultConfig config)
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
void processCommandQueue()
std::array< UpdateInfo, MAX_METRICS > updateInfos
int64_t pushedMetricsLapse
int64_t publishingDoneTotal