Project
Loading...
Searching...
No Matches
InputRecord.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/InputSpan.h"
13#include "Framework/InputSpec.h"
16#include <fairmq/Message.h>
17#include <cassert>
18
19#if defined(__GNUC__)
20#pragma GCC diagnostic push
21#pragma GCC diagnostic ignored "-Wshadow"
22#endif
23#include <arrow/builder.h>
24#include <arrow/memory_pool.h>
25#include <arrow/record_batch.h>
26#include <arrow/table.h>
27#include <arrow/type_traits.h>
28#include <arrow/status.h>
29#if defined(__GNUC__)
30#pragma GCC diagnostic pop
31#endif
32
33namespace o2::framework
34{
35
36InputRecord::InputRecord(std::vector<InputRoute> const& inputsSchema,
37 InputSpan& span,
38 ServiceRegistryRef registry)
39 : mRegistry{registry},
40 mInputsSchema{inputsSchema},
41 mSpan{span}
42{
43}
44
45int InputRecord::getPos(const char* binding) const
46{
47 auto inputIndex = 0;
48 for (size_t i = 0; i < mInputsSchema.size(); ++i) {
49 auto& route = mInputsSchema[i];
50 if (route.timeslice != 0) {
51 continue;
52 }
53 if (route.matcher.binding == binding) {
54 return inputIndex;
55 }
56 ++inputIndex;
57 }
58 return -1;
59}
60
62{
63 return getPos(mInputsSchema, matcher).index;
64}
65
67{
68 size_t inputIndex = 0;
69 for (const auto& route : schema) {
70 if (route.timeslice != 0) {
71 continue;
72 }
73 if (DataSpecUtils::match(route.matcher, concrete)) {
74 return {inputIndex};
75 }
76 ++inputIndex;
77 }
79}
80
81int InputRecord::getPos(std::string const& binding) const
82{
83 return this->getPos(binding.c_str());
84}
85
86DataRef InputRecord::getByPos(int pos, int part) const
87{
88 return InputRecord::getByPos(mInputsSchema, mSpan, pos, part);
89}
90
91DataRef InputRecord::getByPos(std::vector<InputRoute> const& schema, InputSpan const& span, int pos, int part)
92{
93 if (pos >= (int)span.size() || pos < 0) {
94 throw runtime_error_f("Unknown message requested at position %d", pos);
95 }
96 if (part > 0 && part >= (int)span.getNofParts(pos)) {
97 throw runtime_error_f("Invalid message part index at %d:%d", pos, part);
98 }
99 if (pos >= (int)schema.size()) {
100 throw runtime_error_f("Unknown schema at position %d", pos);
101 }
102 auto ref = span.get(pos, part);
103 auto inputIndex = 0;
104 auto schemaIndex = 0;
105 for (size_t i = 0; i < schema.size(); ++i) {
106 schemaIndex = i;
107 auto& route = schema[i];
108 if (route.timeslice != 0) {
109 continue;
110 }
111 if (inputIndex == pos) {
112 break;
113 }
114 ++inputIndex;
115 }
116 ref.spec = &schema[schemaIndex].matcher;
117 return ref;
118}
119
120DataRef InputRecord::getFirstValid(bool throwOnFailure) const
121{
122 for (size_t i = 0; i < size(); i++) {
123 auto ref = mSpan.get(i);
124 if (ref.header != nullptr) {
125 ref.spec = &mInputsSchema[i].matcher;
126 return ref;
127 }
128 }
129 if (throwOnFailure) {
130 throw runtime_error_f("No valid input found out of total ", size());
131 }
132 return {};
133}
134
136{
137 if (pos < 0 || pos >= mSpan.size()) {
138 return 0;
139 }
140 return mSpan.getNofParts(pos);
141}
142
144{
145 auto ref = mSpan.getAtIndices(pos, indices);
146 if (pos >= 0 && pos < (int)mInputsSchema.size()) {
147 ref.spec = &mInputsSchema[pos].matcher;
148 }
149 return ref;
150}
151
152size_t InputRecord::size() const
153{
154 return mSpan.size();
155}
156
157bool InputRecord::isValid(char const* s) const
158{
159 DataRef ref = get(s);
160 if (ref.header == nullptr) {
161 return false;
162 }
163 return true;
164}
165
166bool InputRecord::isValid(int s) const
167{
168 if (s >= size()) {
169 return false;
170 }
171 DataRef ref = getByPos(s);
172 if (ref.header == nullptr) {
173 return false;
174 }
175 return true;
176}
177
179{
180 size_t count = 0;
181 for (auto&& _ : *this) {
182 ++count;
183 }
184 return count;
185}
186
187[[nodiscard]] std::string InputRecord::describeAvailableInputs() const
188{
189 std::stringstream ss;
190 ss << "Available inputs: ";
191 bool first = true;
192 for (auto const& route : mInputsSchema) {
193 if (!first) {
194 ss << ", ";
195 }
196 ss << route.matcher.binding;
197 first = false;
198 }
199 return ss.str();
200}
201
202} // namespace o2::framework
std::string binding
std::shared_ptr< arrow::Schema > schema
int32_t i
uint16_t pos
Definition RawData.h:3
int getPos(const char *name) const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
InputRecord(std::vector< InputRoute > const &inputs, InputSpan &span, ServiceRegistryRef)
size_t countValidInputs() const
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
DataRef getAtIndices(int pos, DataRefIndices indices) const
O(1) access to the part described by indices in slot pos.
size_t getNofParts(int pos) const
DataRef getFirstValid(bool throwOnFailure=false) const
Get the ref of the first valid input. If requested, throw an error if none is found.
DataRef getAtIndices(size_t slotIdx, DataRefIndices indices) const
Return the DataRef for the part described by indices in slot slotIdx in O(1).
Definition InputSpan.h:54
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:93
size_t getNofParts(size_t i) const
number of parts in the i-th element of the InputSpan
Definition InputSpan.h:72
DataRef get(size_t i, size_t partidx=0) const
i-th element of the InputSpan (O(partidx) sequential scan via indices protocol)
Definition InputSpan.h:44
GLint GLsizei count
Definition glcorearb.h:399
GLsizei GLenum const void * indices
Definition glcorearb.h:400
GLint ref
Definition glcorearb.h:291
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error_f(const char *,...)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static constexpr size_t INVALID