![]() |
Project
|
#include <DataProcessingStates.h>
Classes | |
struct | CommandHeader |
struct | CommandSpec |
struct | StateSpec |
struct | StateView |
struct | UpdateInfo |
Public Member Functions | |
DataProcessingStates (std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase, std::function< int64_t(int64_t base, int64_t offset)> getTimestamp) | |
DataProcessingStates (DataProcessingStates const &other) | |
std::string_view | state (int64_t id) const |
void | registerState (StateSpec const &spec) |
void | updateState (CommandSpec state) |
void | processCommandQueue () |
void | flushChangedStates (std::function< void(std::string const &, int64_t, std::string_view)> const &callback) |
void | repack () |
void | lapseTelemetry () |
Public Attributes | |
std::atomic< size_t > | statesSize |
std::array< char, STATES_BUFFER_SIZE > | store = {} |
std::vector< char > | statesBuffer |
The buffer were we store the state before flushing it. | |
std::array< StateView, MAX_STATES > | statesViews = {} |
The views for all the states, indexed by the state id. | |
std::array< bool, MAX_STATES > | updated = {} |
std::array< bool, MAX_STATES > | enabled = {} |
std::array< std::string, MAX_STATES > | stateNames = {} |
std::array< UpdateInfo, MAX_STATES > | updateInfos |
std::array< StateSpec, MAX_STATES > | stateSpecs |
std::atomic< int > | lastInsertedState = 0 |
std::atomic< int > | nextState = STATES_BUFFER_SIZE |
std::atomic< int > | pendingStates = 0 |
int64_t | lastFlushedToRemote = 0 |
int64_t | lastMetrics = 0 |
std::function< void(int64_t &base, int64_t &offset)> | getRealtimeBase |
std::function< int64_t(int64_t base, int64_t offset)> | getTimestamp |
int64_t | realTimeBase = 0 |
int64_t | initialTimeOffset = 0 |
std::atomic< int64_t > | generation = 0 |
std::atomic< int64_t > | updatedMetricsLapse = 0 |
int64_t | updatedMetricsTotal = 0 |
int64_t | pushedMetricsTotal = 0 |
int64_t | pushedMetricsLapse = 0 |
int64_t | publishedMetricsTotal = 0 |
int64_t | publishedMetricsLapse = 0 |
int64_t | publishingInvokedTotal = 0 |
int64_t | publishingDoneTotal = 0 |
std::vector< short > | registeredStates = {} |
Static Public Attributes | |
static constexpr ServiceKind | service_kind = ServiceKind::Global |
static constexpr int | STATES_BUFFER_SIZE = 1 << 18 |
static constexpr int | MAX_STATES = 2048 |
Helper struct to hold state of the data processing while it is running. This is meant to then be used to report the state of the data processing to the driver. This is similar to the DataProcessingStats, however it can only track the fact that a given substate (registered as a metric) has changed. No other operations are supported.
Definition at line 49 of file DataProcessingStates.h.
o2::framework::DataProcessingStates::DataProcessingStates | ( | std::function< void(int64_t &base, int64_t &offset)> | getRealtimeBase, |
std::function< int64_t(int64_t base, int64_t offset)> | getTimestamp | ||
) |
Definition at line 27 of file DataProcessingStates.cxx.
|
inline |
Definition at line 53 of file DataProcessingStates.h.
void o2::framework::DataProcessingStates::flushChangedStates | ( | std::function< void(std::string const &, int64_t, std::string_view)> const & | callback | ) |
Definition at line 161 of file DataProcessingStates.cxx.
|
inline |
Definition at line 192 of file DataProcessingStates.h.
void o2::framework::DataProcessingStates::processCommandQueue | ( | ) |
Definition at line 35 of file DataProcessingStates.cxx.
Definition at line 230 of file DataProcessingStates.cxx.
void o2::framework::DataProcessingStates::repack | ( | ) |
Definition at line 203 of file DataProcessingStates.cxx.
|
inline |
Definition at line 136 of file DataProcessingStates.h.
void o2::framework::DataProcessingStates::updateState | ( | CommandSpec | state | ) |
Definition at line 94 of file DataProcessingStates.cxx.
std::array<bool, MAX_STATES> o2::framework::DataProcessingStates::enabled = {} |
Definition at line 164 of file DataProcessingStates.h.
std::atomic<int64_t> o2::framework::DataProcessingStates::generation = 0 |
Definition at line 207 of file DataProcessingStates.h.
std::function<void(int64_t& base, int64_t& offset)> o2::framework::DataProcessingStates::getRealtimeBase |
Definition at line 183 of file DataProcessingStates.h.
std::function<int64_t(int64_t base, int64_t offset)> o2::framework::DataProcessingStates::getTimestamp |
Definition at line 185 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::initialTimeOffset = 0 |
Definition at line 189 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::lastFlushedToRemote = 0 |
Definition at line 179 of file DataProcessingStates.h.
std::atomic<int> o2::framework::DataProcessingStates::lastInsertedState = 0 |
Definition at line 172 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::lastMetrics = 0 |
Definition at line 180 of file DataProcessingStates.h.
|
staticconstexpr |
Definition at line 75 of file DataProcessingStates.h.
std::atomic<int> o2::framework::DataProcessingStates::nextState = STATES_BUFFER_SIZE |
Definition at line 176 of file DataProcessingStates.h.
std::atomic<int> o2::framework::DataProcessingStates::pendingStates = 0 |
Definition at line 178 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::publishedMetricsLapse = 0 |
Definition at line 214 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::publishedMetricsTotal = 0 |
Definition at line 213 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::publishingDoneTotal = 0 |
Definition at line 216 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::publishingInvokedTotal = 0 |
Definition at line 215 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::pushedMetricsLapse = 0 |
Definition at line 212 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::pushedMetricsTotal = 0 |
Definition at line 211 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::realTimeBase = 0 |
Definition at line 187 of file DataProcessingStates.h.
std::vector<short> o2::framework::DataProcessingStates::registeredStates = {} |
Definition at line 217 of file DataProcessingStates.h.
|
staticconstexpr |
Definition at line 73 of file DataProcessingStates.h.
std::array<std::string, MAX_STATES> o2::framework::DataProcessingStates::stateNames = {} |
Definition at line 165 of file DataProcessingStates.h.
|
staticconstexpr |
Definition at line 74 of file DataProcessingStates.h.
std::vector<char> o2::framework::DataProcessingStates::statesBuffer |
The buffer were we store the state before flushing it.
Definition at line 158 of file DataProcessingStates.h.
std::array<StateSpec, MAX_STATES> o2::framework::DataProcessingStates::stateSpecs |
Definition at line 167 of file DataProcessingStates.h.
std::atomic<size_t> o2::framework::DataProcessingStates::statesSize |
Definition at line 152 of file DataProcessingStates.h.
std::array<StateView, MAX_STATES> o2::framework::DataProcessingStates::statesViews = {} |
The views for all the states, indexed by the state id.
Definition at line 160 of file DataProcessingStates.h.
std::array<char, STATES_BUFFER_SIZE> o2::framework::DataProcessingStates::store = {} |
The temporary buffer where we store state update commands before they are processed.
Definition at line 156 of file DataProcessingStates.h.
std::array<bool, MAX_STATES> o2::framework::DataProcessingStates::updated = {} |
Definition at line 162 of file DataProcessingStates.h.
std::atomic<int64_t> o2::framework::DataProcessingStates::updatedMetricsLapse = 0 |
Definition at line 209 of file DataProcessingStates.h.
int64_t o2::framework::DataProcessingStates::updatedMetricsTotal = 0 |
Definition at line 210 of file DataProcessingStates.h.
std::array<UpdateInfo, MAX_STATES> o2::framework::DataProcessingStates::updateInfos |
Definition at line 166 of file DataProcessingStates.h.