11#ifndef O2_FRAMEWORK_INPUTRECORD_H_
12#define O2_FRAMEWORK_INPUTRECORD_H_
39#include <fairmq/FwdDecls.h>
130 template <
typename T>
140 using base = std::default_delete<T>;
155 mProperty =
other.mProperty;
156 }
else if (mProperty !=
other.mProperty) {
170 base::operator=(
other);
172 mProperty =
other.mProperty;
173 }
else if (mProperty !=
other.mProperty) {
185 base::operator()(
ptr);
197 [[nodiscard]]
int getPos(
const std::string&
name)
const;
220 auto msg = describeAvailableInputs();
221 throw runtime_error_f(
"InputRecord::get: no input with binding %s found. %s", bindingName,
msg.c_str());
226 template <
typename R>
227 requires std::is_convertible_v<R, char const*>
233 template <
typename R>
234 requires requires(
R r) {
r.c_str(); }
240 template <
typename R>
241 requires std::is_convertible_v<R, DataRef>
258 template <
typename T = DataRef,
typename R>
263 using PointerLessValueT = std::remove_pointer_t<T>;
265 if constexpr (std::is_same_v<std::decay_t<T>,
DataRef>) {
267 }
else if constexpr (std::is_same<T, std::string>::value) {
276 }
else if constexpr (std::is_same<T, char const*>::value) {
283 return reinterpret_cast<char const*
>(
ref.payload);
286 }
else if constexpr (std::is_same<T, TableConsumer>::value) {
290 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
297 static_assert(is_messageable<typename T::value_type>::value,
"span can only be created for messageable types");
298 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
301 throw runtime_error(
"Inconsistent serialization method for extracting span");
303 using ValueT =
typename T::value_type;
305 if (payloadSize %
sizeof(ValueT)) {
311 return gsl::span<ValueT const>(
reinterpret_cast<ValueT const*
>(
ref.payload), payloadSize /
sizeof(ValueT));
316 if constexpr (is_specialization_v<std::remove_const_t<T>, std::vector>) {
317 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
319 auto method = header->payloadSerializationMethod;
323 auto*
start =
reinterpret_cast<typename T::value_type const*
>(
ref.payload);
324 auto*
end =
start + payloadSize /
sizeof(
typename T::value_type);
333 using NonConstT =
typename std::remove_const<T>::type;
337 auto object = DataRefUtils::as<NonConstT>(
ref);
341 std::swap(
const_cast<NonConstT&
>(container), *
object);
344 throw runtime_error(
"No supported conversion function for ROOT serialized message");
347 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
350 static_assert(always_static_assert_v<T>,
"unsupported code path");
354 }
else if constexpr (is_messageable<T>::value) {
360 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
361 auto method = header->payloadSerializationMethod;
365 throw runtime_error(
"Can not extract a plain object from serialized message");
367 return *
reinterpret_cast<T const*
>(
ref.payload);
370 }
else if constexpr (std::is_pointer_v<T> &&
371 (is_messageable<PointerLessValueT>::value ||
377 using ValueT = PointerLessValueT;
379 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
381 auto method = header->payloadSerializationMethod;
383 if constexpr (is_messageable<ValueT>::value) {
384 auto const*
ptr =
reinterpret_cast<ValueT const*
>(
ref.payload);
391 auto*
start =
reinterpret_cast<typename ValueT::value_type const*
>(
ref.payload);
392 auto*
end =
start + payloadSize /
sizeof(
typename ValueT::value_type);
393 auto container = std::make_unique<ValueT>(
start,
end);
416 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
423 auto cacheEntry = cache.matcherToId.find(
path);
424 if (cacheEntry == cache.matcherToId.end()) {
425 cache.matcherToId.insert(std::make_pair(
path,
id));
427 void* obj = (
void*)
result.get();
430 LOGP(info,
"Caching in {} ptr to {} ({})",
id.
value,
path, obj);
433 auto& oldId = cacheEntry->second;
435 if (oldId.value ==
id.value) {
436 std::unique_ptr<ValueT const, Deleter<ValueT const>>
result((ValueT
const*)cache.idToObject[
id],
false);
442 delete reinterpret_cast<ValueT*
>(cache.idToObject[oldId]);
444 void* obj = (
void*)
result.get();
447 LOGP(info,
"Replacing cached entry {} with {} for {} ({})", oldId.value,
id.value,
path, obj);
448 oldId.value =
id.value;
451 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
453 }
else if constexpr (std::is_pointer_v<T>) {
459 static_assert(always_static_assert_v<T>,
"ROOT objects need to be retrieved by pointer");
463 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
464 auto method = header->payloadSerializationMethod;
468 "Type mismatch: attempt to extract a non-messagable object "
469 "from message with unserialized data");
477 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
482 template <
typename T = DataRef,
typename R>
483 std::map<std::string, std::string>&
get(
R binding,
int part = 0) const
487 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
489 auto method = header->payloadSerializationMethod;
491 throw runtime_error(
"Attempt to extract metadata from a non-CCDB serialised message");
498 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
506 cache.matcherToMetadataId.insert(std::make_pair(
path,
id));
508 LOGP(info,
"Caching CCDB metadata {}: {}",
id.
value,
path);
509 return cache.idToMetadata[
id];
511 auto& oldId = cacheEntry->second;
513 if (oldId.value ==
id.value) {
514 LOGP(
debug,
"Returning cached CCDB metatada {}: {}",
id.
value,
path);
515 return cache.idToMetadata[
id];
519 LOGP(info,
"Replacing cached entry {} with {} for {}", oldId.value,
id.value,
path);
521 oldId.value =
id.value;
522 return cache.idToMetadata[
id];
525 template <
typename T>
526 requires(std::same_as<T, DataRef>)
531 auto msg = describeAvailableInputs();
537 template <
typename T>
538 requires(std::same_as<T, TableConsumer>)
541 auto ref = get<DataRef>(matcher, part);
542 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
547 [[nodiscard]]
bool isValid(std::string
const& s)
const
553 bool isValid(
char const* s)
const;
559 [[nodiscard]]
size_t size()
const;
566 template <
typename ParentT,
typename T>
584 if (mPosition < mSize) {
585 if (mParent->isValid(mPosition)) {
586 mElement = mParent->getByPos(mPosition);
598 while (mPosition < mSize && ++mPosition < mSize) {
599 if (!mParent->isValid(mPosition)) {
602 mElement = mParent->getByPos(mPosition);
605 if (mPosition >= mSize) {
626 return mPosition == rh.mPosition;
631 return mPosition != rh.mPosition;
636 if (mPosition >= mSize || mElement.header ==
nullptr) {
641 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
642 return *dh == matcher;
647 if (mPosition >= mSize || mElement.header ==
nullptr) {
652 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
681 template <
typename T>
726 [[nodiscard]]
size_t size()
const
747 return {
this,
false};
762 [[nodiscard]] std::string describeAvailableInputs()
const;
765 std::vector<InputRoute>
const& mInputsSchema;
header::DataOrigin origin
header::DataDescription description
std::vector< OutputRoute > routes
GLuint const GLchar * name
GLsizei const GLfloat * value
GLsizei GLenum const void * indices
GLsizei const GLchar *const * path
constexpr o2::header::DataDescription gDataDescriptionInvalid
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static std::map< std::string, std::string > extractCCDBHeaders(DataRef const &ref)
static auto as(DataRef const &ref)
static std::string describe(InputSpec const &spec)
static Id fromRef(DataRef &ref)
std::unordered_map< Id, void *, Id::hash_fn > idToObject
std::unordered_map< std::string, Id > matcherToMetadataId
VectorOfTObjectPtrs other
uint64_t const void const *restrict const msg