21#include <fairmq/Device.h>
34 for (
size_t i = 0;
i <
other.mServicesBooked.size(); ++
i) {
46 for (
size_t i = 0;
i <
other.mServicesBooked.size(); ++
i) {
70 LOGP(detail,
"Registering service {} with hash {} in salt {} of kind {}",
75 throw runtime_error_f(
"Cannot register a stream service %s without a valid spec index",
name ?
name :
"<unknown>");
85 if (oldService ==
nullptr) {
95 std::memory_order_seq_cst)) {
99 std::atomic_thread_fence(std::memory_order_release);
103 throw runtime_error_f(
"Unable to find a spot in the registry for service %d. Make sure you use const / non-const correctly.", typeHash.
hash);
119 throw runtime_error_f(
"Service %s is a stream service, but does not have a uniqueId method.", spec.
name.c_str());
122 throw runtime_error_f(
"Declaring a stream service %s in a non-global context is not allowed.", spec.
name.c_str());
131 throw runtime_error_f(
"Late binding of stream services needs a stream context");
133 for (
auto& spec :
mSpecs) {
147 handle.callback(*
this);
153 throw runtime_error_f(
"Unable to find service of kind %s (%d) in stream %d and dataprocessor %d. Make sure you use const / non-const correctly.",
name, hash, streamId, dataProcessorId);
190 if (isStream && salt.
streamId <= 0) {
201 std::atomic_thread_fence(std::memory_order_acquire);
213 throw runtime_error_f(
"Service %s found in registry at %d rather than where expected by getPos",
name,
i);
216 throw runtime_error_f(
"Found service %s with hash %d but with salt %d of service kind %d",
220 throw runtime_error_f(
"Unable to find requested service %s with hash %d using salt %d for service kind %d",
229 LOGP(detail,
"Caching global service {} for stream {}",
name ?
name :
"", salt.
streamId);
231 std::atomic_thread_fence(std::memory_order_acquire);
236 std::atomic_thread_fence(std::memory_order_acquire);
240 LOGP(detail,
"Global service {} for stream {} is nullptr",
name ?
name :
"", salt.
streamId);
243 LOGP(detail,
"Registering a stream service {} with hash {} and salt {}",
name ?
name :
"", typeHash.
hash,
valueFromSalt(salt));
246 throw runtime_error_f(
"Stream service %s with hash %d using salt %d for service kind %d was not declared upfront.",
254 ServiceHandle handle = spec.init({registry, salt}, deviceState, *rawDeviceService.device()->fConfig);
261 LOGP(error,
"Unable to find requested service {} with hash {} using salt {} for service kind {}",
name ?
name :
"", typeHash.
hash,
valueFromSalt(salt), (
int)kind);
268 handle.callback(
ref);
299static std::map<std::thread::id, int> locks;
301#define LOCKING_DEBUG debug
305 if (mMutex.try_lock()) {
306 locks[std::this_thread::get_id()]++;
307 LOG(
LOCKING_DEBUG) <<
"ServiceRegistry locked for salt stream " << salt.streamId <<
" dataprocessor " << salt.dataProcessorId
308 <<
" (" << lockCounter.load() <<
", " << std::this_thread::get_id() <<
"). " << locks[std::this_thread::get_id()];
311 << salt.dataProcessorId <<
" (" << std::this_thread::get_id() <<
") is attempting to lock mutex";
313 locks[std::this_thread::get_id()]++;
314 LOG(
LOCKING_DEBUG) <<
"ServiceRegistry locked for stream " << salt.streamId <<
" dataprocessor " << salt.dataProcessorId <<
" (" << std::this_thread::get_id() <<
"). " << locks[std::this_thread::get_id()];
320 LOG(
LOCKING_DEBUG) <<
"ServiceRegistry unlocked by salt stream " << salt.streamId <<
" dataprocessor " << salt.dataProcessorId <<
" (" << std::this_thread::get_id() <<
"). " << locks[std::this_thread::get_id()];
322 locks[std::this_thread::get_id()]--;
#define O2_BUILTIN_UNREACHABLE
#define O2_DPL_ACQUIRE(...)
#define O2_DPL_RELEASE(...)
#define O2_LOCKABLE_NAMED(T, V, N)
GLuint const GLchar * name
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
@ DeviceStream
A Service which is specific to a given thread in a thread pool.
RuntimeErrorRef runtime_error_f(const char *,...)
static void bindProcessorService(DataProcessorContext &dpContext, ServiceSpec const &spec, void *service)
static void bindStreamService(DataProcessorContext &dpContext, StreamContext &stream, ServiceSpec const &spec, void *service)
Running state information of a given device.
ServiceKind kind
Kind of service.
void * instance
Type erased pointer to a service.
unsigned int hash
Unique hash associated to the type of service.
std::string name
Mnemonic name to use for the service.
void postRenderGUICallbacks()
std::array< std::atomic< Key >, MAX_SERVICES+MAX_DISTANCE > mServicesKey
void unlock(Salt salt) const O2_DPL_RELEASE(mMutex)
static constexpr Salt GLOBAL_CONTEXT_SALT
std::vector< ServiceSpec > mSpecs
std::array< void *, MAX_SERVICES+MAX_DISTANCE > mServicesValue
static constexpr int32_t MAX_DISTANCE
The maximum distance a entry can be from the optimal slot.
static constexpr int32_t valueFromSalt(Salt salt)
void lateBindStreamServices(DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt)
void declareService(ServiceSpec const &spec, DeviceState &state, fair::mq::ProgOptions &options, ServiceRegistry::Salt salt=ServiceRegistry::globalDeviceSalt())
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, Salt salt, char const *name=nullptr, ServiceRegistry::SpecIndex specIndex=SpecIndex{-1}) const
int getPos(ServiceTypeHash typeHash, Salt salt) const
ServiceRegistry & operator=(ServiceRegistry const &other)
constexpr Index indexFromInstance(InstanceId id) const
static constexpr uint32_t MAX_SERVICES
The number of slots in the hashmap.
void bindService(ServiceRegistry::Salt salt, ServiceSpec const &spec, void *service) const
static constexpr uint64_t valueFromKey(Key key)
void throwError(const char *name, int64_t hash, int64_t streamId, int64_t dataprocessorId) const
To hide exception throwing from QC.
std::array< Meta, MAX_SERVICES+MAX_DISTANCE > mServicesMeta
static Salt globalDeviceSalt()
void * get(ServiceTypeHash typeHash, Salt salt, ServiceKind kind, char const *name=nullptr) const
std::vector< ServicePostRenderGUIHandle > mPostRenderGUIHandles
Callbacks to be executed after the main GUI has been drawn.
void lock(Salt salt) const O2_DPL_ACQUIRE(mMutex)
std::array< std::atomic< bool >, MAX_SERVICES+MAX_DISTANCE > mServicesBooked
static Salt dataProcessorSalt(short dataProcessorId)
constexpr InstanceId instanceFromTypeSalt(ServiceTypeHash type, Salt salt) const
ServiceInit init
Callback to initialise the service.
std::string name
Name of the service.
ServicePostRenderGUI postRenderGUI
Callback invoked after the main GUI has been drawn.
ServiceId uniqueId
Callback to get the unique id for the Service.
ServiceKind kind
Kind of service being specified.
std::map< std::string, ID > expected
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"