Project
Loading...
Searching...
No Matches
InputRecord.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/InputSpan.h"
13#include "Framework/InputSpec.h"
16#include <fairmq/Message.h>
17#include <cassert>
18
19#if defined(__GNUC__)
20#pragma GCC diagnostic push
21#pragma GCC diagnostic ignored "-Wshadow"
22#endif
23#include <arrow/builder.h>
24#include <arrow/memory_pool.h>
25#include <arrow/record_batch.h>
26#include <arrow/table.h>
27#include <arrow/type_traits.h>
28#include <arrow/status.h>
29#if defined(__GNUC__)
30#pragma GCC diagnostic pop
31#endif
32
33namespace o2::framework
34{
35
36InputRecord::InputRecord(std::vector<InputRoute> const& inputsSchema,
37 InputSpan& span,
38 ServiceRegistryRef registry)
39 : mRegistry{registry},
40 mInputsSchema{inputsSchema},
41 mSpan{span}
42{
43}
44
45int InputRecord::getPos(const char* binding) const
46{
47 auto inputIndex = 0;
48 for (size_t i = 0; i < mInputsSchema.size(); ++i) {
49 auto& route = mInputsSchema[i];
50 if (route.timeslice != 0) {
51 continue;
52 }
53 if (route.matcher.binding == binding) {
54 return inputIndex;
55 }
56 ++inputIndex;
57 }
58 return -1;
59}
60
62{
63 return getPos(mInputsSchema, matcher).index;
64}
65
67{
68 size_t inputIndex = 0;
69 for (const auto& route : schema) {
70 if (route.timeslice != 0) {
71 continue;
72 }
73 if (DataSpecUtils::match(route.matcher, concrete)) {
74 return {inputIndex};
75 }
76 ++inputIndex;
77 }
79}
80
81int InputRecord::getPos(std::string const& binding) const
82{
83 return this->getPos(binding.c_str());
84}
85
86DataRef InputRecord::getByPos(int pos, int part) const
87{
88 return InputRecord::getByPos(mInputsSchema, mSpan, pos, part);
89}
90
91DataRef InputRecord::getByPos(std::vector<InputRoute> const& schema, InputSpan const& span, int pos, int part)
92{
93 if (pos >= (int)span.size() || pos < 0) {
94 throw runtime_error_f("Unknown message requested at position %d", pos);
95 }
96 if (part > 0 && part >= (int)span.getNofParts(pos)) {
97 throw runtime_error_f("Invalid message part index at %d:%d", pos, part);
98 }
99 if (pos >= (int)schema.size()) {
100 throw runtime_error_f("Unknown schema at position %d", pos);
101 }
102 auto ref = span.get(pos, part);
103 auto inputIndex = 0;
104 auto schemaIndex = 0;
105 for (size_t i = 0; i < schema.size(); ++i) {
106 schemaIndex = i;
107 auto& route = schema[i];
108 if (route.timeslice != 0) {
109 continue;
110 }
111 if (inputIndex == pos) {
112 break;
113 }
114 ++inputIndex;
115 }
116 ref.spec = &schema[schemaIndex].matcher;
117 return ref;
118}
119
120DataRef InputRecord::getFirstValid(bool throwOnFailure) const
121{
122 for (size_t i = 0; i < size(); i++) {
123 auto ref = mSpan.get(i);
124 if (ref.header != nullptr) {
125 ref.spec = &mInputsSchema[i].matcher;
126 return ref;
127 }
128 }
129 if (throwOnFailure) {
130 throw runtime_error_f("No valid input found out of total ", size());
131 }
132 return {};
133}
134
136{
137 if (pos < 0 || pos >= mSpan.size()) {
138 return 0;
139 }
140 return mSpan.getNofParts(pos);
141}
142size_t InputRecord::size() const
143{
144 return mSpan.size();
145}
146
147bool InputRecord::isValid(char const* s) const
148{
149 DataRef ref = get(s);
150 if (ref.header == nullptr) {
151 return false;
152 }
153 return true;
154}
155
156bool InputRecord::isValid(int s) const
157{
158 if (s >= size()) {
159 return false;
160 }
161 DataRef ref = getByPos(s);
162 if (ref.header == nullptr) {
163 return false;
164 }
165 return true;
166}
167
169{
170 size_t count = 0;
171 for (auto&& _ : *this) {
172 ++count;
173 }
174 return count;
175}
176
177[[nodiscard]] std::string InputRecord::describeAvailableInputs() const
178{
179 std::stringstream ss;
180 ss << "Available inputs: ";
181 bool first = true;
182 for (auto const& route : mInputsSchema) {
183 if (!first) {
184 ss << ", ";
185 }
186 ss << route.matcher.binding;
187 first = false;
188 }
189 return ss.str();
190}
191
192} // namespace o2::framework
std::string binding
std::shared_ptr< arrow::Schema > schema
int32_t i
uint16_t pos
Definition RawData.h:3
int getPos(const char *name) const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
InputRecord(std::vector< InputRoute > const &inputs, InputSpan &span, ServiceRegistryRef)
size_t countValidInputs() const
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
size_t getNofParts(int pos) const
DataRef getFirstValid(bool throwOnFailure=false) const
Get the ref of the first valid input. If requested, throw an error if none is found.
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:82
size_t getNofParts(size_t i) const
number of parts in the i-th element of the InputSpan
Definition InputSpan.h:58
DataRef get(size_t i, size_t partidx=0) const
i-th element of the InputSpan
Definition InputSpan.h:52
GLint GLsizei count
Definition glcorearb.h:399
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error_f(const char *,...)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static constexpr size_t INVALID