Project
Loading...
Searching...
No Matches
DataProcessingStates.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include "Framework/Logger.h"
17#include <uv.h>
18#include <iostream>
19#include <atomic>
20#include <utility>
21#include <thread>
22#include <string_view>
23
24namespace o2::framework
25{
26
27DataProcessingStates::DataProcessingStates(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase_,
28 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_)
29 : getTimestamp(getTimestamp_),
30 getRealtimeBase(getRealtimeBase_)
31{
33}
34
36{
37 int position = lastInsertedState.load(std::memory_order_relaxed);
38 // Process the commands in the order we have just computed.
41 // Avoid alignment issues.
42 memcpy(&header, &store[position], sizeof(DataProcessingStates::CommandHeader));
43 int id = header.id;
44 int64_t timestamp = header.timestamp;
45 int size = header.size;
46 assert(id < updateInfos.size());
47 auto& update = updateInfos[id];
48 // We need to update only once per invoked callback,
49 // because the metrics are stored in reverse insertion order.
50 if (generation > update.generation) {
51 // If we have enough capacity, we reuse the buffer.
52 // If the size is the same, we mark it as updated only if different.
53 if (statesViews[id].size == size) {
54 int diff = memcmp(statesBuffer.data() + statesViews[id].first, &store[position + sizeof(DataProcessingStates::CommandHeader)], size);
55 if (diff) {
56 memcpy(statesBuffer.data() + statesViews[id].first, &store[position + sizeof(DataProcessingStates::CommandHeader)], size);
57 updated[id] = true;
58 update.timestamp = timestamp;
59 update.generation = generation;
61 }
62 } else if (statesViews[id].capacity >= size) {
63 memcpy(statesBuffer.data() + statesViews[id].first, &store[position + sizeof(DataProcessingStates::CommandHeader)], size);
64 statesViews[id].size = size;
65 updated[id] = true;
66 update.timestamp = timestamp;
67 update.generation = generation;
69 } else if (statesViews[id].capacity < size) {
70 // Otherwise we need to reallocate.
71 int newCapacity = std::max(size, 64);
72 int first = statesBuffer.size();
73 statesBuffer.resize(statesBuffer.size() + newCapacity);
74 assert(first < statesBuffer.size());
75 memcpy(statesBuffer.data() + first, &store[position + sizeof(DataProcessingStates::CommandHeader)], size);
76 statesViews[id].first = first;
77 statesViews[id].size = size;
78 statesViews[id].capacity = newCapacity;
79 updated[id] = true;
80 update.timestamp = timestamp;
81 update.generation = generation;
83 }
84 }
85 position += sizeof(DataProcessingStates::CommandHeader) + header.size;
86 }
88 // We reset the queue. Once again, the queue is filled in reverse order.
89 nextState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
90 lastInsertedState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
91 generation++;
92}
93
95{
96 LOGP(debug, "Updating state {} with {}", cmd.id, std::string_view(cmd.data, cmd.size));
97 if (stateSpecs[cmd.id].name.empty()) {
98 throw runtime_error_f("StateID %d was not registered", (int)cmd.id);
99 }
100 if (cmd.id >= stateNames.size()) {
101 throw runtime_error_f("StateID %d is out of range", (int)cmd.id);
102 }
103 if (cmd.id >= updateInfos.size()) {
104 throw runtime_error_f("MetricID %d is out of range", (int)cmd.id);
105 }
106 // Do not update currently disabled states
107 if (enabled[cmd.id] == false) {
108 return;
109 }
111 // Add a static mutex to protect the queue
112 // Get the next available operation in an atomic way.
113 int size = sizeof(CommandHeader) + cmd.size;
114 if (size > 16384) {
115 throw runtime_error_f("State size is %d for state %s. States larger than 16384 bytes not supported for now.",
116 size, stateSpecs[cmd.id].name.c_str());
117 }
118 int idx = nextState.fetch_sub(size, std::memory_order_relaxed);
119 if (idx - size < 0) {
120 // We abort this command
122 while (pendingStates.load(std::memory_order_relaxed) > 0) {
123 // We need to wait for all the pending commands to be processed.
124 // This is needed because we are going to flush the queue.
125 // We cannot flush the queue while there are pending commands
126 // as we might end up with a command being processed twice.
127 std::this_thread::sleep_for(std::chrono::milliseconds(1));
128 }
130 lastInsertedState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
131 nextState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
133 idx = nextState.fetch_sub(size, std::memory_order_relaxed);
134 } else if (idx < 0) {
135 while (idx < 0) {
136 // We need to wait for the flushing of the queue
137 std::this_thread::sleep_for(std::chrono::milliseconds(1));
138 idx = nextState.load(std::memory_order_relaxed);
139 }
140 return updateState(cmd);
141 }
142 // Save the state in the queue
143 assert(idx >= 0);
144 assert(cmd.id < statesViews.size());
145 int64_t timestamp = getTimestamp(realTimeBase, initialTimeOffset);
146 // We also write starting from idx - size, because we know this is
147 // reserved for us.
148 idx -= size;
149 CommandHeader header{(short)cmd.id, cmd.size, timestamp};
150 assert(idx >= 0);
151 assert(idx + sizeof(CommandHeader) + cmd.size <= store.size());
152 memcpy(&store.data()[idx], &header, sizeof(CommandHeader));
153 memcpy(&store.data()[idx + sizeof(CommandHeader)], cmd.data, cmd.size);
154
155 lastInsertedState = idx;
157 // Keep track of the number of commands we have received.
159}
160
161void DataProcessingStates::flushChangedStates(std::function<void(std::string const&, int64_t, std::string_view)> const& callback)
162{
164 bool publish = false;
165 auto currentTimestamp = getTimestamp(realTimeBase, initialTimeOffset);
166 for (auto mi : registeredStates) {
167 auto& update = updateInfos[mi];
168 auto& spec = stateSpecs[mi];
169 auto& view = statesViews[mi];
170 if (updated[mi] == false && (currentTimestamp - update.timestamp) > spec.maxRefreshLatency) {
171 updated[mi] = true;
172 update.timestamp = currentTimestamp;
173 }
174 if (updated[mi] == false) {
175 continue;
176 }
177 int64_t delayPublished = (currentTimestamp - update.lastPublished);
178 assert(delayPublished >= 0);
179 if (delayPublished < spec.minPublishInterval) {
180 continue;
181 }
182 publish = true;
183 assert(view.first + view.size <= statesBuffer.size());
184 assert(view.first <= statesBuffer.size());
185 auto msg = std::string_view(statesBuffer.data() + view.first, view.size);
186
187 callback(spec.name.data(), update.timestamp, msg);
189 update.lastPublished = currentTimestamp;
190 updated[mi] = false;
191 }
192 if (publish) {
194 }
195 static int64_t startTime = uv_hrtime();
196 int64_t now = uv_hrtime();
197 double averageInvocations = (publishingInvokedTotal * 1000000000) / (now - startTime);
198 double averagePublishing = (publishedMetricsLapse * 1000000000) / (now - startTime);
199
200 LOGP(debug, "Publishing invoked {} times / s, {} metrics published / s", (int)averageInvocations, (int)averagePublishing);
201}
202
204{
205 // Everytime we publish, we repack the states buffer so that we can minimize the
206 // amount of memory me use.
207 std::array<int, MAX_STATES> order;
208 std::iota(order.begin(), order.end(), 0);
209 std::stable_sort(order.begin(), order.begin() + statesViews.size(), [&](int a, int b) {
210 return statesViews[a].first < statesViews[b].first;
211 });
212 int position = 0;
213 for (size_t i = 0; i < order.size(); ++i) {
214 auto& view = statesViews[order[i]];
215 // If we have no size, we do not need to move anything.
216 if (view.size == 0) {
217 continue;
218 }
219 // If we are already in the correct place, do nothing.
220 if (view.first == position) {
221 continue;
222 }
223 memcpy(statesBuffer.data() + position, statesBuffer.data() + view.first, view.size);
224 view.first = position;
225 view.capacity = view.size;
226 position += view.size;
227 }
228}
229
231{
232 if (stateSpecs[spec.stateId].name.size() != 0 && spec.name != stateSpecs[spec.stateId].name) {
233 auto currentName = stateSpecs[spec.stateId].name;
234 throw runtime_error_f("Metric %d already registered with name %s", spec.stateId, currentName.data(), spec.name.data());
235 }
236 auto currentMetric = std::find_if(stateSpecs.begin(), stateSpecs.end(), [&spec](StateSpec const& s) { return s.name == spec.name && s.stateId != spec.stateId; });
237 if (currentMetric != stateSpecs.end()) {
238 throw runtime_error_f("Metric %s already registered with id %d. Cannot reregister with %d.", spec.name.data(), currentMetric->stateId, spec.stateId);
239 }
240 stateSpecs[spec.stateId] = spec;
241 stateNames[spec.stateId] = spec.name;
242 int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset);
243 updateInfos[spec.stateId] = UpdateInfo{currentTime, currentTime};
244 updated[spec.stateId] = spec.sendInitialValue;
245 enabled[spec.stateId] = spec.defaultEnabled;
246 registeredStates.push_back(spec.stateId);
247}
248
249} // namespace o2::framework
int32_t i
bool publish(std::string const &filename, std::string const &path, std::string CCDBpath)
Definition GRPTool.cxx:198
std::ostringstream debug
GLsizeiptr size
Definition glcorearb.h:659
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLint first
Definition glcorearb.h:399
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLintptr offset
Definition glcorearb.h:660
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint id
Definition glcorearb.h:650
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error_f(const char *,...)
std::array< StateView, MAX_STATES > statesViews
The views for all the states, indexed by the state id.
std::array< StateSpec, MAX_STATES > stateSpecs
std::array< std::string, MAX_STATES > stateNames
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
std::array< char, STATES_BUFFER_SIZE > store
void registerState(StateSpec const &spec)
std::array< bool, MAX_STATES > updated
DataProcessingStates(std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase, std::function< int64_t(int64_t base, int64_t offset)> getTimestamp)
void flushChangedStates(std::function< void(std::string const &, int64_t, std::string_view)> const &callback)
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
std::vector< char > statesBuffer
The buffer were we store the state before flushing it.
std::array< UpdateInfo, MAX_STATES > updateInfos
uint64_t const void const *restrict const msg
Definition x9.h:153