Project
Loading...
Searching...
No Matches
InputRecord.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_INPUTRECORD_H_
12#define O2_FRAMEWORK_INPUTRECORD_H_
13
14#include "Framework/DataRef.h"
16#include "Framework/InputSpan.h"
20#include "Framework/Traits.h"
22#include "Framework/Logger.h"
25
26#include "Headers/DataHeader.h"
27
28#include <gsl/gsl>
29
30#include <iterator>
31#include <string>
32#include <vector>
33#include <cstring>
34#include <cassert>
35#include <memory>
36#include <type_traits>
37#include <concepts>
38#include <span>
39
40#include <fairmq/FwdDecls.h>
41
42namespace o2::framework
43{
44
45// Wrapper class to get CCDB metadata
48
52struct CCDBBlob {
53};
54
55struct InputSpec;
56class InputSpan;
57class CallbackService;
58
110{
111 public:
113
114 // Typesafe position inside a record of an input.
115 // Multiple routes by which the input gets in this
116 // position are multiplexed.
117 struct InputPos {
118 size_t index;
119 constexpr static size_t INVALID = -1LL;
120 };
121
122 InputRecord(std::vector<InputRoute> const& inputs,
125
137 template <typename T>
138 class Deleter : public std::default_delete<T>
139 {
140 public:
141 enum struct OwnershipProperty : short {
142 Unknown = -1,
143 NotOwning = 0,
144 Owning = 1
145 };
146
147 using base = std::default_delete<T>;
149 // using pointer = typename base::pointer;
150
151 constexpr Deleter() = default;
152 constexpr Deleter(bool isOwning)
153 : base::default_delete(), mProperty(isOwning ? OwnershipProperty::Owning : OwnershipProperty::NotOwning)
154 {
155 }
156
157 // copy constructor is needed in the setup of unique_ptr
158 // check that assignments happen only to uninitialized instances
159 constexpr Deleter(const self_type& other) : base::default_delete(other), mProperty{OwnershipProperty::Unknown}
160 {
161 if (mProperty == OwnershipProperty::Unknown) {
162 mProperty = other.mProperty;
163 } else if (mProperty != other.mProperty) {
164 throw runtime_error("Attemp to change resource control");
165 }
166 }
167
168 // copy constructor for the default delete which simply sets the
169 // resource ownership control to 'Owning'
170 constexpr Deleter(const base& other) : base::default_delete(other), mProperty{OwnershipProperty::Owning} {}
171
172 // allow assignment operator only for pristine or matching resource control property
174 {
175 // the default_deleter does not have any state, so this could be skipped, but keep the call to
176 // the base for completeness, and the (small) chance for changing the base
177 base::operator=(other);
178 if (mProperty == OwnershipProperty::Unknown) {
179 mProperty = other.mProperty;
180 } else if (mProperty != other.mProperty) {
181 throw runtime_error("Attemp to change resource control");
182 }
183 return *this;
184 }
185
186 void operator()(T* ptr) const
187 {
188 if (mProperty == OwnershipProperty::NotOwning) {
189 // nothing done if resource is not owned
190 return;
191 }
192 base::operator()(ptr);
193 }
194
195 private:
197 };
198
199 int getPos(const char* name) const;
200 int getPos(ConcreteDataMatcher matcher) const;
201 [[nodiscard]] static InputPos getPos(std::vector<InputRoute> const& routes, ConcreteDataMatcher matcher);
202 [[nodiscard]] static DataRef getByPos(std::vector<InputRoute> const& routes, InputSpan const& span, int pos, int part = 0);
203
204 [[nodiscard]] int getPos(const std::string& name) const;
205
206 [[nodiscard]] DataRef getByPos(int pos, int part = 0) const;
207
209 [[nodiscard]] DataRef getFirstValid(bool throwOnFailure = false) const;
210
211 [[nodiscard]] size_t getNofParts(int pos) const;
212
214 [[nodiscard]] DataRef getAtIndices(int pos, DataRefIndices indices) const;
215
217 [[nodiscard]] DataRefIndices nextIndices(int pos, DataRefIndices current) const
218 {
219 return mSpan.nextIndices(pos, current);
220 }
221
222 // Given a binding by string, return the associated DataRef
223 DataRef getDataRefByString(const char* bindingName, int part = 0) const
224 {
225 int pos = getPos(bindingName);
226 if (pos < 0) {
227 auto msg = describeAvailableInputs();
228 throw runtime_error_f("InputRecord::get: no input with binding %s found. %s", bindingName, msg.c_str());
229 }
230 return this->getByPos(pos, part);
231 }
232
233 template <typename R>
234 requires std::is_convertible_v<R, char const*>
235 DataRef getRef(R binding, int part = 0) const
236 {
237 return getDataRefByString(binding, part);
238 }
239
240 template <typename R>
241 requires requires(R r) { r.c_str(); }
242 DataRef getRef(R binding, int part = 0) const
243 {
244 return getDataRefByString(binding.c_str(), part);
245 }
246
247 template <typename R>
248 requires std::is_convertible_v<R, DataRef>
249 DataRef getRef(R ref, int part = 0) const
250 {
251 return ref;
252 }
253
265 template <typename T = DataRef, typename R>
266 decltype(auto) get(R binding, int part = 0) const
267 {
268 DataRef ref = getRef(binding, part);
269
270 using PointerLessValueT = std::remove_pointer_t<T>;
271
272 if constexpr (std::is_same_v<std::decay_t<T>, DataRef>) {
273 return ref;
274 } else if constexpr (std::is_same<T, std::string>::value) {
275 // substitution for std::string
276 // If we ask for a string, we need to duplicate it because we do not want
277 // the buffer to be deleted when it goes out of scope. The string is built
278 // from the data and its lengh, null-termination is not necessary.
279 // return std::string object
280 return std::string(ref.payload, DataRefUtils::getPayloadSize(ref));
281
282 // implementation (c)
283 } else if constexpr (std::is_same<T, char const*>::value) {
284 // substitution for const char*
285 // If we ask for a char const *, we simply point to the payload. Notice this
286 // is meant for C-style strings which are expected to be null terminated.
287 // If you want to actually get hold of the buffer, use gsl::span<char> as that will
288 // give you the size as well.
289 // return pointer to payload content
290 return reinterpret_cast<char const*>(ref.payload);
291
292 // implementation (d)
293 } else if constexpr (std::is_same<T, TableConsumer>::value) {
294 // substitution for TableConsumer
295 // For the moment this is dummy, as it requires proper support to
296 // create the RDataSource from the arrow buffer.
297 auto data = reinterpret_cast<uint8_t const*>(ref.payload);
298 return std::make_unique<TableConsumer>(data, DataRefUtils::getPayloadSize(ref));
299
300 // implementation (f)
301 } else if constexpr (is_span<T>::value) {
302 // substitution for span of messageable objects
303 // FIXME: there will be std::span in C++20
304 static_assert(is_messageable<typename T::value_type>::value, "span can only be created for messageable types");
305 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
306 assert(header);
307 if (sizeof(typename T::value_type) > 1 && header->payloadSerializationMethod != o2::header::gSerializationMethodNone) {
308 throw runtime_error("Inconsistent serialization method for extracting span");
309 }
310 using ValueT = typename T::value_type;
311 auto payloadSize = DataRefUtils::getPayloadSize(ref);
312 if (payloadSize % sizeof(ValueT)) {
313 throw runtime_error(("Inconsistent type and payload size at " + std::string(ref.spec->binding) + "(" + DataSpecUtils::describe(*ref.spec) + ")" +
314 ": type size " + std::to_string(sizeof(ValueT)) +
315 " payload size " + std::to_string(payloadSize))
316 .c_str());
317 }
318 return gsl::span<ValueT const>(reinterpret_cast<ValueT const*>(ref.payload), payloadSize / sizeof(ValueT));
319
320 // implementation (g)
321 } else if constexpr (is_container<T>::value) {
322 // currently implemented only for vectors
323 if constexpr (is_specialization_v<std::remove_const_t<T>, std::vector>) {
324 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
325 auto payloadSize = DataRefUtils::getPayloadSize(ref);
326 auto method = header->payloadSerializationMethod;
328 // TODO: construct a vector spectator
329 // this is a quick solution now which makes a copy of the plain vector data
330 auto* start = reinterpret_cast<typename T::value_type const*>(ref.payload);
331 auto* end = start + payloadSize / sizeof(typename T::value_type);
332 T result(start, end);
333 return result;
334 } else if (method == o2::header::gSerializationMethodROOT) {
340 using NonConstT = typename std::remove_const<T>::type;
341 if constexpr (is_specialization_v<T, ROOTSerialized> == true || has_root_dictionary<T>::value == true) {
342 // we expect the unique_ptr to hold an object, exception should have been thrown
343 // otherwise
344 auto object = DataRefUtils::as<NonConstT>(ref);
345 // need to swap the content of the deserialized container to a local variable to force return
346 // value optimization
347 T container;
348 std::swap(const_cast<NonConstT&>(container), *object);
349 return container;
350 } else {
351 throw runtime_error("No supported conversion function for ROOT serialized message");
352 }
353 } else {
354 throw runtime_error("Attempt to extract object from message with unsupported serialization type");
355 }
356 } else {
357 static_assert(always_static_assert_v<T>, "unsupported code path");
358 }
359
360 // implementation (h)
361 } else if constexpr (is_messageable<T>::value) {
362 // extract a messageable type by reference
363 // Cast content of payload bound by @a binding to known type.
364 // we need to check the serialization type, the cast makes only sense for
365 // unserialized objects
366
367 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
368 auto method = header->payloadSerializationMethod;
370 // FIXME: we could in principle support serialized content here as well if we
371 // store all extracted objects internally and provide cleanup
372 throw runtime_error("Can not extract a plain object from serialized message");
373 }
374 return *reinterpret_cast<T const*>(ref.payload);
375
376 // implementation (i)
377 } else if constexpr (std::is_pointer_v<T> &&
378 (is_messageable<PointerLessValueT>::value ||
380 (is_specialization_v<PointerLessValueT, std::vector> && has_messageable_value_type<PointerLessValueT>::value) ||
382 // extract a messageable type or object with ROOT dictionary by pointer
383 // return unique_ptr to message content with custom deleter
384 using ValueT = PointerLessValueT;
385
386 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
387 auto payloadSize = DataRefUtils::getPayloadSize(ref);
388 auto method = header->payloadSerializationMethod;
390 if constexpr (is_messageable<ValueT>::value) {
391 auto const* ptr = reinterpret_cast<ValueT const*>(ref.payload);
392 // return type with non-owning Deleter instance
393 std::unique_ptr<ValueT const, Deleter<ValueT const>> result(ptr, Deleter<ValueT const>(false));
394 return result;
395 } else if constexpr (is_specialization_v<ValueT, std::vector> && has_messageable_value_type<ValueT>::value) {
396 // TODO: construct a vector spectator
397 // this is a quick solution now which makes a copy of the plain vector data
398 auto* start = reinterpret_cast<typename ValueT::value_type const*>(ref.payload);
399 auto* end = start + payloadSize / sizeof(typename ValueT::value_type);
400 auto container = std::make_unique<ValueT>(start, end);
401 std::unique_ptr<ValueT const, Deleter<ValueT const>> result(container.release(), Deleter<ValueT const>(true));
402 return result;
403 }
404 throw runtime_error("unsupported code path");
405 } else if (method == o2::header::gSerializationMethodROOT) {
406 // This supports the common case of retrieving a root object and getting pointer.
407 // Notice that this will return a copy of the actual contents of the buffer, because
408 // the buffer is actually serialised, for this reason we return a unique_ptr<T>.
409 // FIXME: does it make more sense to keep ownership of all the deserialised
410 // objects in a single place so that we can avoid duplicate deserializations?
411 // explicitely specify serialization method to ROOT-serialized because type T
412 // is messageable and a different method would be deduced in DataRefUtils
413 // return type with owning Deleter instance, forwarding to default_deleter
414 std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<ROOTSerialized<ValueT>>(ref).release());
415 return result;
416 } else if (method == o2::header::gSerializationMethodCCDB) {
417 // This is to support deserialising objects from CCDB. Contrary to what happens for
418 // other objects, those objects are most likely long lived, so we
419 // keep around an instance of the associated object and deserialise it only when
420 // it's updated.
421 // FIXME: add ability to apply callbacks to deserialised objects.
422 auto id = ObjectCache::Id::fromRef(ref);
423 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
424 // If the matcher does not have an entry in the cache, deserialise it
425 // and cache the deserialised object at the given id.
426 auto path = fmt::format("{}", DataSpecUtils::describe(matcher));
427 LOGP(debug, "{}", path);
428 auto& cache = mRegistry.get<ObjectCache>();
429 auto& callbacks = mRegistry.get<CallbackService>();
430 auto cacheEntry = cache.matcherToId.find(path);
431 if (cacheEntry == cache.matcherToId.end()) {
432 cache.matcherToId.insert(std::make_pair(path, id));
433 std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
434 void* obj = (void*)result.get();
435 callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
436 cache.idToObject[id] = obj;
437 LOGP(info, "Caching in {} ptr to {} ({})", id.value, path, obj);
438 return result;
439 }
440 auto& oldId = cacheEntry->second;
441 // The id in the cache is the same, let's simply return it.
442 if (oldId.value == id.value) {
443 std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)cache.idToObject[id], false);
444 LOGP(debug, "Returning cached entry {} for {} ({})", id.value, path, (void*)result.get());
445 return result;
446 }
447 // The id in the cache is different. Let's destroy the old cached entry
448 // and create a new one.
449 delete reinterpret_cast<ValueT*>(cache.idToObject[oldId]);
450 std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
451 void* obj = (void*)result.get();
452 callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
453 cache.idToObject[id] = obj;
454 LOGP(info, "Replacing cached entry {} with {} for {} ({})", oldId.value, id.value, path, obj);
455 oldId.value = id.value;
456 return result;
457 } else {
458 throw runtime_error("Attempt to extract object from message with unsupported serialization type");
459 }
460 } else if constexpr (std::is_pointer_v<T>) {
461 static_assert(always_static_assert<T>::value, "T is not a supported type");
462 } else if constexpr (has_root_dictionary<T>::value) {
463 // retrieving ROOT objects follows the pointer approach, i.e. T* has to be specified
464 // as template parameter and a unique_ptr will be returned, std vectors of ROOT serializable
465 // objects can be retrieved by move, this is handled above in the "container" code branch
466 static_assert(always_static_assert_v<T>, "ROOT objects need to be retrieved by pointer");
467 } else {
468 // non-messageable objects for which serialization method can not be derived by type,
469 // the operation depends on the transmitted serialization method
470 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
471 auto method = header->payloadSerializationMethod;
473 // this code path is only selected if the type is non-messageable
474 throw runtime_error(
475 "Type mismatch: attempt to extract a non-messagable object "
476 "from message with unserialized data");
477 } else if (method == o2::header::gSerializationMethodROOT) {
478 // explicitely specify serialization method to ROOT-serialized because type T
479 // is messageable and a different method would be deduced in DataRefUtils
480 // return type with owning Deleter instance, forwarding to default_deleter
481 std::unique_ptr<T const, Deleter<T const>> result(DataRefUtils::as<ROOTSerialized<T>>(ref).release());
482 return result;
483 } else {
484 throw runtime_error("Attempt to extract object from message with unsupported serialization type");
485 }
486 }
487 }
488
489 template <typename T = DataRef, typename R>
490 std::map<std::string, std::string>& get(R binding, int part = 0) const
491 requires std::same_as<T, CCDBMetadataExtractor>
492 {
493 auto ref = getRef(binding, part);
494 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
495 auto payloadSize = DataRefUtils::getPayloadSize(ref);
496 auto method = header->payloadSerializationMethod;
497 if (method != header::gSerializationMethodCCDB) {
498 throw runtime_error("Attempt to extract metadata from a non-CCDB serialised message");
499 }
500 // This is to support deserialising objects from CCDB. Contrary to what happens for
501 // other objects, those objects are most likely long lived, so we
502 // keep around an instance of the associated object and deserialise it only when
503 // it's updated.
504 auto id = ObjectCache::Id::fromRef(ref);
505 ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
506 // If the matcher does not have an entry in the cache, deserialise it
507 // and cache the deserialised object at the given id.
508 auto path = fmt::format("{}", DataSpecUtils::describe(matcher));
509 LOGP(debug, "{}", path);
510 auto& cache = mRegistry.get<ObjectCache>();
511 auto cacheEntry = cache.matcherToMetadataId.find(path);
512 if (cacheEntry == cache.matcherToMetadataId.end()) {
513 cache.matcherToMetadataId.insert(std::make_pair(path, id));
514 cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
515 LOGP(info, "Caching CCDB metadata {}: {}", id.value, path);
516 return cache.idToMetadata[id];
517 }
518 auto& oldId = cacheEntry->second;
519 // The id in the cache is the same, let's simply return it.
520 if (oldId.value == id.value) {
521 LOGP(debug, "Returning cached CCDB metatada {}: {}", id.value, path);
522 return cache.idToMetadata[id];
523 }
524 // The id in the cache is different. Let's destroy the old cached entry
525 // and create a new one.
526 LOGP(info, "Replacing cached entry {} with {} for {}", oldId.value, id.value, path);
527 cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
528 oldId.value = id.value;
529 return cache.idToMetadata[id];
530 }
531
532 template <typename T = DataRef, typename R>
533 std::span<const char> get(R binding, int part = 0) const
534 requires std::same_as<T, CCDBBlob>
535 {
536 auto ref = getRef(binding, part);
537 auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
538 if (header->payloadSerializationMethod != header::gSerializationMethodCCDB) {
539 throw runtime_error("Attempt to extract CCDBBlob from a non-CCDB-serialized message");
540 }
542 }
543
544 template <typename T>
545 requires(std::same_as<T, DataRef>)
546 decltype(auto) get(ConcreteDataMatcher matcher, int part = 0)
547 {
548 auto pos = getPos(matcher);
549 if (pos < 0) {
550 auto msg = describeAvailableInputs();
551 throw runtime_error_f("InputRecord::get: no input %s found. %s", DataSpecUtils::describe(matcher).c_str(), msg.c_str());
552 }
553 return getByPos(pos, part);
554 }
555
556 template <typename T>
557 requires(std::same_as<T, TableConsumer>)
558 decltype(auto) get(ConcreteDataMatcher matcher, int part = 0)
559 {
560 auto ref = get<DataRef>(matcher, part);
561 auto data = reinterpret_cast<uint8_t const*>(ref.payload);
562 return std::make_unique<TableConsumer>(data, DataRefUtils::getPayloadSize(ref));
563 }
564
566 [[nodiscard]] bool isValid(std::string const& s) const
567 {
568 return isValid(s.c_str());
569 }
570
572 bool isValid(char const* s) const;
573 [[nodiscard]] bool isValid(int pos) const;
574
578 [[nodiscard]] size_t size() const;
579
583 [[nodiscard]] size_t countValidInputs() const;
584
585 template <typename ParentT, typename T>
587 {
588 public:
589 using ParentType = ParentT;
591 using iterator_category = std::forward_iterator_tag;
592 using value_type = T;
593 using reference = T&;
594 using pointer = T*;
595 using difference_type = std::ptrdiff_t;
596 using ElementType = typename std::remove_const<value_type>::type;
597
598 Iterator() = delete;
599
600 Iterator(ParentType const* parent, bool isEnd = false)
601 : mPosition(isEnd ? parent->size() : 0), mSize(parent->size()), mParent(parent), mElement{nullptr, nullptr, nullptr}
602 {
603 if (mPosition < mSize) {
604 if (mParent->isValid(mPosition)) {
605 mElement = mParent->getByPos(mPosition);
606 } else {
607 ++(*this);
608 }
609 }
610 }
611
612 ~Iterator() = default;
613
614 // prefix increment
616 {
617 while (mPosition < mSize && ++mPosition < mSize) {
618 if (!mParent->isValid(mPosition)) {
619 continue;
620 }
621 mElement = mParent->getByPos(mPosition);
622 break;
623 }
624 if (mPosition >= mSize) {
625 // reset the element to the default value of the type
626 mElement = ElementType{};
627 }
628 return *this;
629 }
630 // postfix increment
631 SelfType operator++(int /*unused*/)
632 {
633 SelfType copy(*this);
634 operator++();
635 return copy;
636 }
637 // return reference
639 {
640 return mElement;
641 }
642 // comparison
643 bool operator==(const SelfType& rh) const
644 {
645 return mPosition == rh.mPosition;
646 }
647 // comparison
648 bool operator!=(const SelfType& rh) const
649 {
650 return mPosition != rh.mPosition;
651 }
652
653 [[nodiscard]] bool matches(o2::header::DataHeader matcher) const
654 {
655 if (mPosition >= mSize || mElement.header == nullptr) {
656 return false;
657 }
658 // at this point there must be a DataHeader, this has been checked by the DPL
659 // input cache
660 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
661 return *dh == matcher;
662 }
663
665 {
666 if (mPosition >= mSize || mElement.header == nullptr) {
667 return false;
668 }
669 // at this point there must be a DataHeader, this has been checked by the DPL
670 // input cache
671 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(mElement);
672 return dh->dataOrigin == origin && (description == o2::header::gDataDescriptionInvalid || dh->dataDescription == description);
673 }
674
679
680 [[nodiscard]] ParentType const* parent() const
681 {
682 return mParent;
683 }
684
685 [[nodiscard]] size_t position() const
686 {
687 return mPosition;
688 }
689
690 [[nodiscard]] auto parts() const
691 {
692 return mParent->parts(mPosition);
693 }
694
695 private:
696 size_t mPosition;
697 size_t mSize;
698 ParentType const* mParent;
699 ElementType mElement;
700 };
701
705 template <typename T>
706 class InputRecordIterator : public Iterator<InputRecord, T>
707 {
708 public:
713 using pointer = typename BaseType::pointer;
714 using ElementType = typename std::remove_const<value_type>::type;
715
716 InputRecordIterator(InputRecord const* parent, bool isEnd = false)
717 : BaseType(parent, isEnd)
718 {
719 }
720
722 [[nodiscard]] bool isValid(size_t = 0) const
723 {
724 if (this->position() < this->parent()->size()) {
725 return this->parent()->isValid(this->position());
726 }
727 return false;
728 }
729 };
730
733
734 [[nodiscard]] const_iterator begin() const
735 {
736 return {this, false};
737 }
738
739 [[nodiscard]] const_iterator end() const
740 {
741 return {this, true};
742 }
743
745 struct PartRange {
747 size_t slot;
748
749 [[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; }
750 [[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; }
751 [[nodiscard]] DataRef getAtIndices(DataRefIndices idx) const { return record->getAtIndices((int)slot, idx); }
752 [[nodiscard]] DataRefIndices nextIndices(DataRefIndices idx) const { return record->nextIndices((int)slot, idx); }
753 [[nodiscard]] size_t size() const { return record->getNofParts((int)slot); }
754
755 [[nodiscard]] InputSpan::Iterator<PartRange, const DataRef> begin() const { return {this, size() == 0}; }
756 [[nodiscard]] InputSpan::Iterator<PartRange, const DataRef> end() const { return {this, true}; }
757 };
758
760 [[nodiscard]] PartRange parts(size_t pos) const { return {this, pos}; }
761
763 {
764 return mSpan;
765 }
766
767 private:
768 // Produce a string describing the available inputs.
769 [[nodiscard]] std::string describeAvailableInputs() const;
770
771 ServiceRegistryRef mRegistry;
772 std::vector<InputRoute> const& mInputsSchema;
773 InputSpan& mSpan;
774};
775
776} // namespace o2::framework
777
778#endif // O2_FRAMEWORK_INPUTREGISTRY_H_
header::DataOrigin origin
header::DataDescription description
std::string binding
std::vector< OutputRoute > routes
std::ostringstream debug
uint16_t pos
Definition RawData.h:3
TBranch * ptr
std::default_delete< T > base
constexpr Deleter(const self_type &other)
@ Owning
don't delete the underlying buffer
constexpr Deleter(bool isOwning)
self_type & operator=(const self_type &other)
constexpr Deleter(const base &other)
typename std::remove_const< value_type >::type ElementType
InputRecordIterator(InputRecord const *parent, bool isEnd=false)
typename BaseType::value_type value_type
bool isValid(size_t=0) const
Check if slot is valid.
typename std::remove_const< value_type >::type ElementType
bool operator!=(const SelfType &rh) const
bool matches(o2::header::DataOrigin origin, o2::header::DataDescription description=o2::header::gDataDescriptionInvalid) const
Iterator(ParentType const *parent, bool isEnd=false)
std::forward_iterator_tag iterator_category
bool matches(o2::header::DataOrigin origin, o2::header::DataDescription description, o2::header::DataHeader::SubSpecificationType subspec) const
bool operator==(const SelfType &rh) const
ParentType const * parent() const
bool matches(o2::header::DataHeader matcher) const
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
int getPos(const char *name) const
decltype(auto) get(ConcreteDataMatcher matcher, int part=0)
DataRef getRef(R binding, int part=0) const
const_iterator begin() const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
const_iterator end() const
std::span< const char > get(R binding, int part=0) const
DataRef getRef(R binding, int part=0) const
DataRef getDataRefByString(const char *bindingName, int part=0) const
size_t countValidInputs() const
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
PartRange parts(size_t pos) const
Return an iterable range over all parts in slot pos (DataRef objects have spec set).
DataRef getAtIndices(int pos, DataRefIndices indices) const
O(1) access to the part described by indices in slot pos.
decltype(auto) get(ConcreteDataMatcher matcher, int part=0)
std::map< std::string, std::string > & get(R binding, int part=0) const
size_t getNofParts(int pos) const
DataRef getFirstValid(bool throwOnFailure=false) const
Get the ref of the first valid input. If requested, throw an error if none is found.
DataRef getRef(R ref, int part=0) const
DataRefIndices nextIndices(int pos, DataRefIndices current) const
O(1) advance from current to the next part's indices in slot pos.
DataRefIndices nextIndices(size_t slotIdx, DataRefIndices current) const
Advance from current to the indices of the next part in slot slotIdx in O(1).
Definition InputSpan.h:60
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLboolean r
Definition glcorearb.h:1233
GLuint start
Definition glcorearb.h:469
GLint ref
Definition glcorearb.h:291
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataDescription gDataDescriptionInvalid
Definition DataHeader.h:597
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodROOT
Definition DataHeader.h:328
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
constexpr o2::header::SerializationMethod gSerializationMethodCCDB
Definition DataHeader.h:329
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static std::map< std::string, std::string > extractCCDBHeaders(DataRef const &ref)
static std::span< const char > getCCDBPayloadBlob(DataRef const &ref)
static auto as(DataRef const &ref)
static std::string describe(InputSpec const &spec)
static constexpr size_t INVALID
A range over the parts of a single slot that sets ref.spec on each DataRef.
InputSpan::Iterator< PartRange, const DataRef > end() const
DataRefIndices endIndices() const
DataRef getAtIndices(DataRefIndices idx) const
InputSpan::Iterator< PartRange, const DataRef > begin() const
DataRefIndices initialIndices() const
DataRefIndices nextIndices(DataRefIndices idx) const
static Id fromRef(DataRef &ref)
Definition ObjectCache.h:26
std::unordered_map< Id, void *, Id::hash_fn > idToObject
Definition ObjectCache.h:49
std::unordered_map< std::string, Id > matcherToMetadataId
Definition ObjectCache.h:54
the main header struct
Definition DataHeader.h:620
uint32_t SubSpecificationType
Definition DataHeader.h:622
VectorOfTObjectPtrs other
uint64_t const void const *restrict const msg
Definition x9.h:153