Project
Loading...
Searching...
No Matches
ServiceRegistry.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "Framework/Tracing.h"
15#include "Framework/Logger.h"
20#include "ContextHelpers.h"
21#include <fairmq/Device.h>
22#include <iostream>
23
24namespace o2::framework
25{
26
28{
29 for (size_t i = 0; i < MAX_SERVICES; ++i) {
30 mServicesKey[i].store(other.mServicesKey[i].load());
31 }
32 mServicesValue = other.mServicesValue;
33 mServicesMeta = other.mServicesMeta;
34 for (size_t i = 0; i < other.mServicesBooked.size(); ++i) {
35 this->mServicesBooked[i] = other.mServicesBooked[i].load();
36 }
37}
38
40{
41 for (size_t i = 0; i < MAX_SERVICES; ++i) {
42 mServicesKey[i].store(other.mServicesKey[i].load());
43 }
45 mServicesMeta = other.mServicesMeta;
46 for (size_t i = 0; i < other.mServicesBooked.size(); ++i) {
47 this->mServicesBooked[i] = other.mServicesBooked[i].load();
48 }
49 return *this;
50}
51
53{
54 for (size_t i = 0; i < MAX_SERVICES; ++i) {
55 mServicesKey[i].store({0L, 0L});
56 }
57
58 mServicesValue.fill(nullptr);
59 for (size_t i = 0; i < mServicesBooked.size(); ++i) {
60 mServicesBooked[i] = false;
61 }
62}
63
68void ServiceRegistry::registerService(ServiceTypeHash typeHash, void* service, ServiceKind kind, Salt salt, const char* name, SpecIndex specIndex) const
69{
70 LOGP(detail, "Registering service {} with hash {} in salt {} of kind {}",
71 (name ? name : "<unknown>"),
72 typeHash.hash,
73 valueFromSalt(salt), (int)kind);
74 if (specIndex.index == -1 && kind == ServiceKind::Stream && service == nullptr) {
75 throw runtime_error_f("Cannot register a stream service %s without a valid spec index", name ? name : "<unknown>");
76 }
77 InstanceId id = instanceFromTypeSalt(typeHash, salt);
79 // If kind is not stream, there is only one copy of our service.
80 // So we look if it is already registered and reused it if it is.
81 // If not, we register it as thread id 0 and as the passed one.
82 if (kind != ServiceKind::Stream && salt.streamId != 0) {
84 void* oldService = this->get(typeHash, dataProcessorSalt, kind);
85 if (oldService == nullptr) {
86 registerService(typeHash, service, kind, dataProcessorSalt);
87 } else {
88 service = oldService;
89 }
90 }
91 for (uint8_t i = 0; i < MAX_DISTANCE; ++i) {
92 // If the service slot was not taken, take it atomically
93 bool expected = false;
94 if (mServicesBooked[i + index.index].compare_exchange_strong(expected, true,
95 std::memory_order_seq_cst)) {
96 mServicesValue[i + index.index] = service;
97 mServicesMeta[i + index.index] = Meta{kind, name ? strdup(name) : nullptr, specIndex};
98 mServicesKey[i + index.index] = Key{.typeHash = typeHash, .salt = salt};
99 std::atomic_thread_fence(std::memory_order_release);
100 return;
101 }
102 }
103 throw runtime_error_f("Unable to find a spot in the registry for service %d. Make sure you use const / non-const correctly.", typeHash.hash);
104}
105
106void ServiceRegistry::declareService(ServiceSpec const& spec, DeviceState& state, fair::mq::ProgOptions& options, ServiceRegistry::Salt salt)
107{
108 // We save the specs for the late binding
109 mSpecs.push_back(spec);
110 // Services which are not stream must have a single instance created upfront.
111 if (spec.kind != ServiceKind::Stream) {
112 ServiceHandle handle = spec.init({*this}, state, options);
113 this->registerService({handle.hash}, handle.instance, handle.kind, salt, handle.name.c_str());
114 this->bindService(salt, spec, handle.instance);
115 } else if (spec.kind == ServiceKind::Stream) {
116 // We register a nullptr in this case, because we really want to have the ptr to
117 // the service spec only.
118 if (!spec.uniqueId) {
119 throw runtime_error_f("Service %s is a stream service, but does not have a uniqueId method.", spec.name.c_str());
120 }
121 if (salt.streamId != 0) {
122 throw runtime_error_f("Declaring a stream service %s in a non-global context is not allowed.", spec.name.c_str());
123 }
124 this->registerService({spec.uniqueId()}, nullptr, spec.kind, salt, spec.name.c_str(), {static_cast<int>(mSpecs.size() - 1)});
125 }
126}
127
128void ServiceRegistry::lateBindStreamServices(DeviceState& state, fair::mq::ProgOptions& options, ServiceRegistry::Salt salt)
129{
130 if (salt.streamId == 0) {
131 throw runtime_error_f("Late binding of stream services needs a stream context");
132 }
133 for (auto& spec : mSpecs) {
134 if (spec.kind != ServiceKind::Stream) {
135 continue;
136 }
137 ServiceHandle handle = spec.init({*this, salt}, state, options);
138 // Do we need to register it again? Maybe not.
139 this->registerService({handle.hash}, handle.instance, handle.kind, salt, handle.name.c_str());
140 this->bindService(salt, spec, handle.instance);
141 }
142}
143
145{
146 for (auto& handle : mPostRenderGUIHandles) {
147 handle.callback(*this);
148 }
149}
150
151void ServiceRegistry::throwError(const char* name, int64_t hash, int64_t streamId, int64_t dataProcessorId) const
152{
153 throw runtime_error_f("Unable to find service of kind %s (%d) in stream %d and dataprocessor %d. Make sure you use const / non-const correctly.", name, hash, streamId, dataProcessorId);
154}
155
157{
158 InstanceId instanceId = instanceFromTypeSalt(typeHash, salt);
159 Index index = indexFromInstance(instanceId);
160 for (uint8_t i = 0; i < MAX_DISTANCE; ++i) {
161 if (valueFromKey(mServicesKey[i + index.index].load()) == valueFromKey({typeHash.hash, salt})) {
162 return i + index.index;
163 }
164 }
165 return -1;
166}
167
168void* ServiceRegistry::get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const* name) const
169{
170 // Cannot find a stream service using a global salt.
172 throw runtime_error_f("Cannot find %s service using a global salt.", name ? name : "a stream");
173 }
174 // Look for the service. If found, return it.
175 // Notice how due to threading issues, we might
176 // find it with getPos, but the value can still
177 // be nullptr.
178 auto pos = getPos(typeHash, salt);
179 // If we are here it means we never registered a
180 // service for the 0 thread (i.e. the main thread).
181 if (pos != -1 && mServicesMeta[pos].kind == ServiceKind::Stream && valueFromSalt(mServicesKey[pos].load().salt) != valueFromSalt(salt)) {
182 throw runtime_error_f("Inconsistent registry for thread %d. Expected %d", salt.streamId, mServicesKey[pos].load().salt.streamId);
184 }
185
186 if (pos != -1) {
189
190 if (isStream && salt.streamId <= 0) {
191 throw runtime_error_f("A stream service (%s) cannot be retrieved from a non stream salt %d", name ? name : "unknown", salt.streamId);
193 }
194
195 if (isDataProcessor && salt.dataProcessorId < 0) {
196 throw runtime_error_f("A data processor service (%s) cannot be retrieved from a non dataprocessor context %d", name ? name : "unknown", salt.dataProcessorId);
198 }
199
200 mServicesKey[pos].load();
201 std::atomic_thread_fence(std::memory_order_acquire);
202 void* ptr = mServicesValue[pos];
203 if (ptr) {
204 return ptr;
205 }
206 }
207 // We are looking up a service which is not of
208 // stream kind and was not looked up by this thread
209 // before.
210 if (salt.streamId == 0) {
211 for (int i = 0; i < MAX_SERVICES; ++i) {
212 if (mServicesKey[i].load().typeHash.hash == typeHash.hash && valueFromSalt(mServicesKey[i].load().salt) != valueFromSalt(salt)) {
213 throw runtime_error_f("Service %s found in registry at %d rather than where expected by getPos", name, i);
214 }
215 if (mServicesKey[i].load().typeHash.hash == typeHash.hash) {
216 throw runtime_error_f("Found service %s with hash %d but with salt %d of service kind %d",
217 name, typeHash, valueFromSalt(mServicesKey[i].load().salt), (int)mServicesMeta[i].kind);
218 }
219 }
220 throw runtime_error_f("Unable to find requested service %s with hash %d using salt %d for service kind %d",
221 name ? name : "<unknown>", typeHash, valueFromSalt(salt), (int)kind);
222 }
223
224 // Let's lookit up in the global context for the data processor.
225 pos = getPos(typeHash, {.streamId = 0, .dataProcessorId = salt.dataProcessorId});
226 if (pos != -1 && kind != ServiceKind::Stream) {
227 // We found a global service. Register it for this stream and return it.
228 // This will prevent ending up here in the future.
229 LOGP(detail, "Caching global service {} for stream {}", name ? name : "", salt.streamId);
230 mServicesKey[pos].load();
231 std::atomic_thread_fence(std::memory_order_acquire);
232 registerService(typeHash, mServicesValue[pos], kind, salt, name);
233 }
234 if (pos != -1) {
235 mServicesKey[pos].load();
236 std::atomic_thread_fence(std::memory_order_acquire);
237 if (mServicesValue[pos]) {
238 return mServicesValue[pos];
239 }
240 LOGP(detail, "Global service {} for stream {} is nullptr", name ? name : "", salt.streamId);
241 }
242 if (kind == ServiceKind::Stream) {
243 LOGP(detail, "Registering a stream service {} with hash {} and salt {}", name ? name : "", typeHash.hash, valueFromSalt(salt));
244 auto pos = getPos(typeHash, {.streamId = GLOBAL_CONTEXT_SALT.streamId, .dataProcessorId = salt.dataProcessorId});
245 if (pos == -1) {
246 throw runtime_error_f("Stream service %s with hash %d using salt %d for service kind %d was not declared upfront.",
247 name, typeHash, valueFromSalt(salt), (int)kind);
248 }
249 auto& spec = mSpecs[mServicesMeta[pos].specIndex.index];
250 auto& deviceState = this->get<DeviceState>(globalDeviceSalt());
251 auto& rawDeviceService = this->get<RawDeviceService>(globalDeviceSalt());
252 auto& registry = const_cast<ServiceRegistry&>(*this);
253 // Call init for the proper ServiceRegistryRef
254 ServiceHandle handle = spec.init({registry, salt}, deviceState, *rawDeviceService.device()->fConfig);
255 this->registerService({handle.hash}, handle.instance, handle.kind, salt, handle.name.c_str());
256 this->bindService(salt, spec, handle.instance);
257
258 return handle.instance;
259 }
260
261 LOGP(error, "Unable to find requested service {} with hash {} using salt {} for service kind {}", name ? name : "", typeHash.hash, valueFromSalt(salt), (int)kind);
262 return nullptr;
263}
264
266{
267 for (auto& handle : mPostRenderGUIHandles) {
268 handle.callback(ref);
269 }
270}
271
272void ServiceRegistry::bindService(ServiceRegistry::Salt salt, ServiceSpec const& spec, void* service) const
273{
274 static O2_LOCKABLE_NAMED(std::mutex, bindMutex, "bind mutex");
275 // Stream services need to store their callbacks in the stream context.
276 // This is to make sure we invoke the correct callback only once per
277 // stream, since they could bind multiple times.
278 // On the other hand, they should not be allowed to have any
279 // other callback, because we would not know which one to invoke.
280 if (spec.kind == ServiceKind::Stream) {
281 ServiceRegistryRef ref{const_cast<ServiceRegistry&>(*this), salt};
282 auto& streamContext = ref.get<StreamContext>();
283 std::scoped_lock<O2_LOCKABLE(std::mutex)> lock(bindMutex);
284 auto& dataProcessorContext = ref.get<DataProcessorContext>();
285 ContextHelpers::bindStreamService(dataProcessorContext, streamContext, spec, service);
286 } else {
287 ServiceRegistryRef ref{const_cast<ServiceRegistry&>(*this), salt};
288 std::scoped_lock<O2_LOCKABLE(std::mutex)> lock(bindMutex);
289 if (ref.active<DataProcessorContext>()) {
290 auto& dataProcessorContext = ref.get<DataProcessorContext>();
291 ContextHelpers::bindProcessorService(dataProcessorContext, spec, service);
292 }
293 if (spec.postRenderGUI) {
294 mPostRenderGUIHandles.push_back(ServicePostRenderGUIHandle{spec, spec.postRenderGUI, service});
295 }
296 }
297}
298
299static std::map<std::thread::id, int> locks;
300
301#define LOCKING_DEBUG debug
303{
304 // Should probably use compare and exchange here.
305 if (mMutex.try_lock()) {
306 locks[std::this_thread::get_id()]++;
307 LOG(LOCKING_DEBUG) << "ServiceRegistry locked for salt stream " << salt.streamId << " dataprocessor " << salt.dataProcessorId
308 << " (" << lockCounter.load() << ", " << std::this_thread::get_id() << "). " << locks[std::this_thread::get_id()];
309 } else {
310 LOG(LOCKING_DEBUG) << "Stream " << salt.streamId << " dataprocessor "
311 << salt.dataProcessorId << " (" << std::this_thread::get_id() << ") is attempting to lock mutex";
312 mMutex.lock();
313 locks[std::this_thread::get_id()]++;
314 LOG(LOCKING_DEBUG) << "ServiceRegistry locked for stream " << salt.streamId << " dataprocessor " << salt.dataProcessorId << " (" << std::this_thread::get_id() << "). " << locks[std::this_thread::get_id()];
315 }
316}
317
319{
320 LOG(LOCKING_DEBUG) << "ServiceRegistry unlocked by salt stream " << salt.streamId << " dataprocessor " << salt.dataProcessorId << " (" << std::this_thread::get_id() << "). " << locks[std::this_thread::get_id()];
321
322 locks[std::this_thread::get_id()]--;
323 mMutex.unlock();
324}
325
326} // namespace o2::framework
benchmark::State & state
#define O2_BUILTIN_UNREACHABLE
int32_t i
uint16_t pos
Definition RawData.h:3
#define LOCKING_DEBUG
TBranch * ptr
#define O2_DPL_ACQUIRE(...)
#define O2_DPL_RELEASE(...)
#define O2_LOCKABLE(T)
Definition Tracing.h:20
#define O2_LOCKABLE_NAMED(T, V, N)
Definition Tracing.h:19
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ServiceKind
The kind of service we are asking for.
@ DeviceStream
A Service which is specific to a given thread in a thread pool.
RuntimeErrorRef runtime_error_f(const char *,...)
static void bindProcessorService(DataProcessorContext &dpContext, ServiceSpec const &spec, void *service)
static void bindStreamService(DataProcessorContext &dpContext, StreamContext &stream, ServiceSpec const &spec, void *service)
Running state information of a given device.
Definition DeviceState.h:34
ServiceKind kind
Kind of service.
void * instance
Type erased pointer to a service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Mnemonic name to use for the service.
std::array< std::atomic< Key >, MAX_SERVICES+MAX_DISTANCE > mServicesKey
void unlock(Salt salt) const O2_DPL_RELEASE(mMutex)
static constexpr Salt GLOBAL_CONTEXT_SALT
std::vector< ServiceSpec > mSpecs
std::array< void *, MAX_SERVICES+MAX_DISTANCE > mServicesValue
static constexpr int32_t MAX_DISTANCE
The maximum distance a entry can be from the optimal slot.
static constexpr int32_t valueFromSalt(Salt salt)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
void declareService(ServiceSpec const &spec, DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt=ServiceRegistry::globalDeviceSalt())
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, Salt salt, char const *name=nullptr, ServiceRegistry::SpecIndex specIndex=SpecIndex{-1}) const
int getPos(ServiceTypeHash typeHash, Salt salt) const
ServiceRegistry & operator=(ServiceRegistry const &other)
constexpr Index indexFromInstance(InstanceId id) const
static constexpr uint32_t MAX_SERVICES
The number of slots in the hashmap.
void bindService(ServiceRegistry::Salt salt, ServiceSpec const &spec, void *service) const
static constexpr uint64_t valueFromKey(Key key)
void throwError(const char *name, int64_t hash, int64_t streamId, int64_t dataprocessorId) const
To hide exception throwing from QC.
std::array< Meta, MAX_SERVICES+MAX_DISTANCE > mServicesMeta
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
std::vector< ServicePostRenderGUIHandle > mPostRenderGUIHandles
Callbacks to be executed after the main GUI has been drawn.
void lock(Salt salt) const O2_DPL_ACQUIRE(mMutex)
std::array< std::atomic< bool >, MAX_SERVICES+MAX_DISTANCE > mServicesBooked
static Salt dataProcessorSalt(short dataProcessorId)
constexpr InstanceId instanceFromTypeSalt(ServiceTypeHash type, Salt salt) const
ServiceInit init
Callback to initialise the service.
std::string name
Name of the service.
ServicePostRenderGUI postRenderGUI
Callback invoked after the main GUI has been drawn.
ServiceId uniqueId
Callback to get the unique id for the Service.
ServiceKind kind
Kind of service being specified.
std::map< std::string, ID > expected
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"