11#ifndef O2_FRAMEWORK_INPUTRECORD_H_
12#define O2_FRAMEWORK_INPUTRECORD_H_
40#include <fairmq/FwdDecls.h>
137 template <
typename T>
147 using base = std::default_delete<T>;
162 mProperty =
other.mProperty;
163 }
else if (mProperty !=
other.mProperty) {
177 base::operator=(
other);
179 mProperty =
other.mProperty;
180 }
else if (mProperty !=
other.mProperty) {
192 base::operator()(
ptr);
204 [[nodiscard]]
int getPos(
const std::string&
name)
const;
227 auto msg = describeAvailableInputs();
228 throw runtime_error_f(
"InputRecord::get: no input with binding %s found. %s", bindingName,
msg.c_str());
233 template <
typename R>
234 requires std::is_convertible_v<R, char const*>
240 template <
typename R>
241 requires requires(
R r) {
r.c_str(); }
247 template <
typename R>
248 requires std::is_convertible_v<R, DataRef>
265 template <
typename T = DataRef,
typename R>
270 using PointerLessValueT = std::remove_pointer_t<T>;
272 if constexpr (std::is_same_v<std::decay_t<T>,
DataRef>) {
274 }
else if constexpr (std::is_same<T, std::string>::value) {
283 }
else if constexpr (std::is_same<T, char const*>::value) {
290 return reinterpret_cast<char const*
>(
ref.payload);
293 }
else if constexpr (std::is_same<T, TableConsumer>::value) {
297 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
304 static_assert(is_messageable<typename T::value_type>::value,
"span can only be created for messageable types");
305 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
308 throw runtime_error(
"Inconsistent serialization method for extracting span");
310 using ValueT =
typename T::value_type;
312 if (payloadSize %
sizeof(ValueT)) {
318 return gsl::span<ValueT const>(
reinterpret_cast<ValueT const*
>(
ref.payload), payloadSize /
sizeof(ValueT));
323 if constexpr (is_specialization_v<std::remove_const_t<T>, std::vector>) {
324 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
326 auto method = header->payloadSerializationMethod;
330 auto*
start =
reinterpret_cast<typename T::value_type const*
>(
ref.payload);
331 auto*
end =
start + payloadSize /
sizeof(
typename T::value_type);
340 using NonConstT =
typename std::remove_const<T>::type;
344 auto object = DataRefUtils::as<NonConstT>(
ref);
348 std::swap(
const_cast<NonConstT&
>(container), *
object);
351 throw runtime_error(
"No supported conversion function for ROOT serialized message");
354 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
357 static_assert(always_static_assert_v<T>,
"unsupported code path");
361 }
else if constexpr (is_messageable<T>::value) {
367 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
368 auto method = header->payloadSerializationMethod;
372 throw runtime_error(
"Can not extract a plain object from serialized message");
374 return *
reinterpret_cast<T const*
>(
ref.payload);
377 }
else if constexpr (std::is_pointer_v<T> &&
378 (is_messageable<PointerLessValueT>::value ||
384 using ValueT = PointerLessValueT;
386 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
388 auto method = header->payloadSerializationMethod;
390 if constexpr (is_messageable<ValueT>::value) {
391 auto const*
ptr =
reinterpret_cast<ValueT const*
>(
ref.payload);
398 auto*
start =
reinterpret_cast<typename ValueT::value_type const*
>(
ref.payload);
399 auto*
end =
start + payloadSize /
sizeof(
typename ValueT::value_type);
400 auto container = std::make_unique<ValueT>(
start,
end);
423 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
430 auto cacheEntry = cache.matcherToId.find(
path);
431 if (cacheEntry == cache.matcherToId.end()) {
432 cache.matcherToId.insert(std::make_pair(
path,
id));
434 void* obj = (
void*)
result.get();
437 LOGP(info,
"Caching in {} ptr to {} ({})",
id.
value,
path, obj);
440 auto& oldId = cacheEntry->second;
442 if (oldId.value ==
id.value) {
443 std::unique_ptr<ValueT const, Deleter<ValueT const>>
result((ValueT
const*)cache.idToObject[
id],
false);
449 delete reinterpret_cast<ValueT*
>(cache.idToObject[oldId]);
451 void* obj = (
void*)
result.get();
454 LOGP(info,
"Replacing cached entry {} with {} for {} ({})", oldId.value,
id.value,
path, obj);
455 oldId.value =
id.value;
458 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
460 }
else if constexpr (std::is_pointer_v<T>) {
466 static_assert(always_static_assert_v<T>,
"ROOT objects need to be retrieved by pointer");
470 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
471 auto method = header->payloadSerializationMethod;
475 "Type mismatch: attempt to extract a non-messagable object "
476 "from message with unserialized data");
484 throw runtime_error(
"Attempt to extract object from message with unsupported serialization type");
489 template <
typename T = DataRef,
typename R>
490 std::map<std::string, std::string>&
get(
R binding,
int part = 0) const
494 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
496 auto method = header->payloadSerializationMethod;
498 throw runtime_error(
"Attempt to extract metadata from a non-CCDB serialised message");
505 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
513 cache.matcherToMetadataId.insert(std::make_pair(
path,
id));
515 LOGP(info,
"Caching CCDB metadata {}: {}",
id.
value,
path);
516 return cache.idToMetadata[
id];
518 auto& oldId = cacheEntry->second;
520 if (oldId.value ==
id.value) {
521 LOGP(
debug,
"Returning cached CCDB metatada {}: {}",
id.
value,
path);
522 return cache.idToMetadata[
id];
526 LOGP(info,
"Replacing cached entry {} with {} for {}", oldId.value,
id.value,
path);
528 oldId.value =
id.value;
529 return cache.idToMetadata[
id];
532 template <
typename T = DataRef,
typename R>
537 auto header = DataRefUtils::getHeader<header::DataHeader*>(
ref);
539 throw runtime_error(
"Attempt to extract CCDBBlob from a non-CCDB-serialized message");
544 template <
typename T>
545 requires(std::same_as<T, DataRef>)
550 auto msg = describeAvailableInputs();
556 template <
typename T>
557 requires(std::same_as<T, TableConsumer>)
560 auto ref = get<DataRef>(matcher, part);
561 auto data =
reinterpret_cast<uint8_t const*
>(
ref.payload);
566 [[nodiscard]]
bool isValid(std::string
const& s)
const
572 bool isValid(
char const* s)
const;
578 [[nodiscard]]
size_t size()
const;
585 template <
typename ParentT,
typename T>
603 if (mPosition < mSize) {
604 if (mParent->isValid(mPosition)) {
605 mElement = mParent->getByPos(mPosition);
617 while (mPosition < mSize && ++mPosition < mSize) {
618 if (!mParent->isValid(mPosition)) {
621 mElement = mParent->getByPos(mPosition);
624 if (mPosition >= mSize) {
645 return mPosition == rh.mPosition;
650 return mPosition != rh.mPosition;
655 if (mPosition >= mSize || mElement.header ==
nullptr) {
660 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
661 return *dh == matcher;
666 if (mPosition >= mSize || mElement.header ==
nullptr) {
671 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
692 return mParent->parts(mPosition);
705 template <
typename T>
736 return {
this,
false};
769 [[nodiscard]] std::string describeAvailableInputs()
const;
772 std::vector<InputRoute>
const& mInputsSchema;
header::DataOrigin origin
header::DataDescription description
std::vector< OutputRoute > routes
GLuint const GLchar * name
GLsizei const GLfloat * value
GLsizei GLenum const void * indices
GLsizei const GLchar *const * path
constexpr o2::header::DataDescription gDataDescriptionInvalid
Defining ITS Vertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static std::map< std::string, std::string > extractCCDBHeaders(DataRef const &ref)
static std::span< const char > getCCDBPayloadBlob(DataRef const &ref)
static auto as(DataRef const &ref)
static std::string describe(InputSpec const &spec)
static Id fromRef(DataRef &ref)
std::unordered_map< Id, void *, Id::hash_fn > idToObject
std::unordered_map< std::string, Id > matcherToMetadataId
VectorOfTObjectPtrs other
uint64_t const void const *restrict const msg