11#ifndef O2_FRAMEWORK_INPUTRECORD_H_
12#define O2_FRAMEWORK_INPUTRECORD_H_
38#include <fairmq/FwdDecls.h>
129 template <
typename T>
139 using base = std::default_delete<T>;
154 mProperty =
other.mProperty;
155 }
else if (mProperty !=
other.mProperty) {
169 base::operator=(
other);
171 mProperty =
other.mProperty;
172 }
else if (mProperty !=
other.mProperty) {
184 base::operator()(
ptr);
195 [[nodiscard]]
int getPos(
const std::string&
name)
const;
209 auto msg = describeAvailableInputs();
210 throw runtime_error_f(
"InputRecord::get: no input with binding %s found. %s", bindingName,
msg.c_str());
215 template <
typename R>
216 requires std::is_convertible_v<R, char const*>
222 template <
typename R>
223 requires requires(
R r) {
r.c_str(); }
229 template <
typename R>
230 requires std::is_convertible_v<R, DataRef>
247 template <
typename T = DataRef,
typename R>
248 decltype(
auto)
get(
R binding,
int part = 0)
const
252 using PointerLessValueT = std::remove_pointer_t<T>;
254 if constexpr (std::is_same_v<std::decay_t<T>,
DataRef>) {
256 }
else if constexpr (std::is_same<T, std::string>::value) {
265 }
else if constexpr (std::is_same<T, char const*>::value) {
272 return reinterpret_cast<char const*
>(
ref.payload);
275 }
else if constexpr (std::is_same<T, TableConsumer>::value) {
279 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
287 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
290 throw runtime_error(
"Inconsistent serialization method for extracting span");
292 using ValueT =
typename T::value_type;
294 if (payloadSize %
sizeof(ValueT)) {
300 return gsl::span<ValueT const>(
reinterpret_cast<ValueT const*
>(
ref.payload), payloadSize /
sizeof(ValueT));
305 if constexpr (is_specialization_v<std::remove_const_t<T>, std::vector>) {
306 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
308 auto method = header->payloadSerializationMethod;
312 auto*
start =
reinterpret_cast<typename T::value_type const*
>(
ref.payload);
313 auto*
end =
start + payloadSize /
sizeof(
typename T::value_type);
322 using NonConstT =
typename std::remove_const<T>::type;
326 auto object = DataRefUtils::as<NonConstT>(
ref);
330 std::swap(
const_cast<NonConstT&
>(container), *
object);
333 throw runtime_error(
"No supported conversion function for ROOT serialized message");
336 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
339 static_assert(always_static_assert_v<T>,
"unsupported code path");
349 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
350 auto method = header->payloadSerializationMethod;
354 throw runtime_error(
"Can not extract a plain object from serialized message");
356 return *
reinterpret_cast<T const*
>(
ref.payload);
359 }
else if constexpr (std::is_pointer_v<T> &&
366 using ValueT = PointerLessValueT;
368 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
370 auto method = header->payloadSerializationMethod;
373 auto const*
ptr =
reinterpret_cast<ValueT const*
>(
ref.payload);
380 auto*
start =
reinterpret_cast<typename ValueT::value_type const*
>(
ref.payload);
381 auto*
end =
start + payloadSize /
sizeof(
typename ValueT::value_type);
382 auto container = std::make_unique<ValueT>(
start,
end);
405 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
412 auto cacheEntry = cache.matcherToId.find(
path);
413 if (cacheEntry == cache.matcherToId.end()) {
414 cache.matcherToId.insert(std::make_pair(
path,
id));
416 void* obj = (
void*)
result.get();
419 LOGP(info,
"Caching in {} ptr to {} ({})",
id.
value,
path, obj);
422 auto& oldId = cacheEntry->second;
424 if (oldId.value ==
id.value) {
425 std::unique_ptr<ValueT const, Deleter<ValueT const>>
result((ValueT
const*)cache.idToObject[
id],
false);
431 delete reinterpret_cast<ValueT*
>(cache.idToObject[oldId]);
433 void* obj = (
void*)
result.get();
436 LOGP(info,
"Replacing cached entry {} with {} for {} ({})", oldId.value,
id.value,
path, obj);
437 oldId.value =
id.value;
440 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
442 }
else if constexpr (std::is_pointer_v<T>) {
448 static_assert(always_static_assert_v<T>,
"ROOT objects need to be retrieved by pointer");
452 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
453 auto method = header->payloadSerializationMethod;
457 "Type mismatch: attempt to extract a non-messagable object "
458 "from message with unserialized data");
466 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
471 template <
typename T = DataRef,
typename R>
472 std::map<std::string, std::string>&
get(
R binding,
int part = 0) const
476 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
478 auto method = header->payloadSerializationMethod;
480 throw runtime_error(
"Attempt to extract metadata from a non-CCDB serialised message");
487 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
495 cache.matcherToMetadataId.insert(std::make_pair(
path,
id));
497 LOGP(info,
"Caching CCDB metadata {}: {}",
id.
value,
path);
498 return cache.idToMetadata[
id];
500 auto& oldId = cacheEntry->second;
502 if (oldId.value ==
id.value) {
503 LOGP(
debug,
"Returning cached CCDB metatada {}: {}",
id.
value,
path);
504 return cache.idToMetadata[
id];
508 LOGP(info,
"Replacing cached entry {} with {} for {}", oldId.value,
id.value,
path);
510 oldId.value =
id.value;
511 return cache.idToMetadata[
id];
515 [[nodiscard]]
bool isValid(std::string
const& s)
const
521 bool isValid(
char const* s)
const;
527 [[nodiscard]]
size_t size()
const;
534 template <
typename ParentT,
typename T>
552 if (mPosition < mSize) {
553 if (mParent->isValid(mPosition)) {
554 mElement = mParent->getByPos(mPosition);
566 while (mPosition < mSize && ++mPosition < mSize) {
567 if (!mParent->isValid(mPosition)) {
570 mElement = mParent->getByPos(mPosition);
573 if (mPosition >= mSize) {
594 return mPosition == rh.mPosition;
599 return mPosition != rh.mPosition;
604 if (mPosition >= mSize || mElement.header ==
nullptr) {
609 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
610 return *dh == matcher;
615 if (mPosition >= mSize || mElement.header ==
nullptr) {
620 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
649 template <
typename T>
683 [[nodiscard]]
size_t size()
const
704 return {
this, 0,
size()};
709 return {
this,
size()};
719 [[nodiscard]] std::string describeAvailableInputs()
const;
722 std::vector<InputRoute>
const& mInputsSchema;
GLuint const GLchar * name
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
constexpr o2::header::DataDescription gDataDescriptionInvalid
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static std::map< std::string, std::string > extractCCDBHeaders(DataRef const &ref)
static auto as(DataRef const &ref)
static std::string describe(InputSpec const &spec)
static Id fromRef(DataRef &ref)
std::unordered_map< Id, void *, Id::hash_fn > idToObject
std::unordered_map< std::string, Id > matcherToMetadataId
VectorOfTObjectPtrs other
uint64_t const void const *restrict const msg