11#ifndef O2_FRAMEWORK_INPUTRECORD_H_
12#define O2_FRAMEWORK_INPUTRECORD_H_
38#include <fairmq/FwdDecls.h>
129 template <
typename T>
139 using base = std::default_delete<T>;
154 mProperty =
other.mProperty;
155 }
else if (mProperty !=
other.mProperty) {
169 base::operator=(
other);
171 mProperty =
other.mProperty;
172 }
else if (mProperty !=
other.mProperty) {
184 base::operator()(
ptr);
196 [[nodiscard]]
int getPos(
const std::string&
name)
const;
210 auto msg = describeAvailableInputs();
211 throw runtime_error_f(
"InputRecord::get: no input with binding %s found. %s", bindingName,
msg.c_str());
216 template <
typename R>
217 requires std::is_convertible_v<R, char const*>
223 template <
typename R>
224 requires requires(
R r) {
r.c_str(); }
230 template <
typename R>
231 requires std::is_convertible_v<R, DataRef>
248 template <
typename T = DataRef,
typename R>
253 using PointerLessValueT = std::remove_pointer_t<T>;
255 if constexpr (std::is_same_v<std::decay_t<T>,
DataRef>) {
257 }
else if constexpr (std::is_same<T, std::string>::value) {
266 }
else if constexpr (std::is_same<T, char const*>::value) {
273 return reinterpret_cast<char const*
>(
ref.payload);
276 }
else if constexpr (std::is_same<T, TableConsumer>::value) {
280 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
287 static_assert(is_messageable<typename T::value_type>::value,
"span can only be created for messageable types");
288 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
291 throw runtime_error(
"Inconsistent serialization method for extracting span");
293 using ValueT =
typename T::value_type;
295 if (payloadSize %
sizeof(ValueT)) {
301 return gsl::span<ValueT const>(
reinterpret_cast<ValueT const*
>(
ref.payload), payloadSize /
sizeof(ValueT));
306 if constexpr (is_specialization_v<std::remove_const_t<T>, std::vector>) {
307 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
309 auto method = header->payloadSerializationMethod;
313 auto*
start =
reinterpret_cast<typename T::value_type const*
>(
ref.payload);
314 auto*
end =
start + payloadSize /
sizeof(
typename T::value_type);
323 using NonConstT =
typename std::remove_const<T>::type;
327 auto object = DataRefUtils::as<NonConstT>(
ref);
331 std::swap(
const_cast<NonConstT&
>(container), *
object);
334 throw runtime_error(
"No supported conversion function for ROOT serialized message");
337 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
340 static_assert(always_static_assert_v<T>,
"unsupported code path");
344 }
else if constexpr (is_messageable<T>::value) {
350 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
351 auto method = header->payloadSerializationMethod;
355 throw runtime_error(
"Can not extract a plain object from serialized message");
357 return *
reinterpret_cast<T const*
>(
ref.payload);
360 }
else if constexpr (std::is_pointer_v<T> &&
361 (is_messageable<PointerLessValueT>::value ||
367 using ValueT = PointerLessValueT;
369 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
371 auto method = header->payloadSerializationMethod;
373 if constexpr (is_messageable<ValueT>::value) {
374 auto const*
ptr =
reinterpret_cast<ValueT const*
>(
ref.payload);
381 auto*
start =
reinterpret_cast<typename ValueT::value_type const*
>(
ref.payload);
382 auto*
end =
start + payloadSize /
sizeof(
typename ValueT::value_type);
383 auto container = std::make_unique<ValueT>(
start,
end);
406 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
413 auto cacheEntry = cache.matcherToId.find(
path);
414 if (cacheEntry == cache.matcherToId.end()) {
415 cache.matcherToId.insert(std::make_pair(
path,
id));
417 void* obj = (
void*)
result.get();
420 LOGP(info,
"Caching in {} ptr to {} ({})",
id.
value,
path, obj);
423 auto& oldId = cacheEntry->second;
425 if (oldId.value ==
id.value) {
426 std::unique_ptr<ValueT const, Deleter<ValueT const>>
result((ValueT
const*)cache.idToObject[
id],
false);
432 delete reinterpret_cast<ValueT*
>(cache.idToObject[oldId]);
434 void* obj = (
void*)
result.get();
437 LOGP(info,
"Replacing cached entry {} with {} for {} ({})", oldId.value,
id.value,
path, obj);
438 oldId.value =
id.value;
441 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
443 }
else if constexpr (std::is_pointer_v<T>) {
449 static_assert(always_static_assert_v<T>,
"ROOT objects need to be retrieved by pointer");
453 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
454 auto method = header->payloadSerializationMethod;
458 "Type mismatch: attempt to extract a non-messagable object "
459 "from message with unserialized data");
467 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
472 template <
typename T = DataRef,
typename R>
473 std::map<std::string, std::string>&
get(
R binding,
int part = 0) const
477 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
479 auto method = header->payloadSerializationMethod;
481 throw runtime_error(
"Attempt to extract metadata from a non-CCDB serialised message");
488 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
496 cache.matcherToMetadataId.insert(std::make_pair(
path,
id));
498 LOGP(info,
"Caching CCDB metadata {}: {}",
id.
value,
path);
499 return cache.idToMetadata[
id];
501 auto& oldId = cacheEntry->second;
503 if (oldId.value ==
id.value) {
504 LOGP(
debug,
"Returning cached CCDB metatada {}: {}",
id.
value,
path);
505 return cache.idToMetadata[
id];
509 LOGP(info,
"Replacing cached entry {} with {} for {}", oldId.value,
id.value,
path);
511 oldId.value =
id.value;
512 return cache.idToMetadata[
id];
515 template <
typename T>
516 requires(std::same_as<T, DataRef>)
521 auto msg = describeAvailableInputs();
527 template <
typename T>
528 requires(std::same_as<T, TableConsumer>)
531 auto ref = get<DataRef>(matcher, part);
532 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
537 [[nodiscard]]
bool isValid(std::string
const& s)
const
543 bool isValid(
char const* s)
const;
549 [[nodiscard]]
size_t size()
const;
556 template <
typename ParentT,
typename T>
574 if (mPosition < mSize) {
575 if (mParent->isValid(mPosition)) {
576 mElement = mParent->getByPos(mPosition);
588 while (mPosition < mSize && ++mPosition < mSize) {
589 if (!mParent->isValid(mPosition)) {
592 mElement = mParent->getByPos(mPosition);
595 if (mPosition >= mSize) {
616 return mPosition == rh.mPosition;
621 return mPosition != rh.mPosition;
626 if (mPosition >= mSize || mElement.header ==
nullptr) {
631 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
632 return *dh == matcher;
637 if (mPosition >= mSize || mElement.header ==
nullptr) {
642 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
671 template <
typename T>
705 [[nodiscard]]
size_t size()
const
726 return {
this, 0,
size()};
731 return {
this,
size()};
741 [[nodiscard]] std::string describeAvailableInputs()
const;
744 std::vector<InputRoute>
const& mInputsSchema;
header::DataOrigin origin
header::DataDescription description
std::vector< OutputRoute > routes
GLuint const GLchar * name
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
constexpr o2::header::DataDescription gDataDescriptionInvalid
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static std::map< std::string, std::string > extractCCDBHeaders(DataRef const &ref)
static auto as(DataRef const &ref)
static std::string describe(InputSpec const &spec)
static Id fromRef(DataRef &ref)
std::unordered_map< Id, void *, Id::hash_fn > idToObject
std::unordered_map< std::string, Id > matcherToMetadataId
VectorOfTObjectPtrs other
uint64_t const void const *restrict const msg