Project
Loading...
Searching...
No Matches
InputRecord.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/InputSpan.h"
13#include "Framework/InputSpec.h"
16#include <fairmq/Message.h>
17#include <cassert>
18
19#if defined(__GNUC__)
20#pragma GCC diagnostic push
21#pragma GCC diagnostic ignored "-Wshadow"
22#endif
23#include <arrow/builder.h>
24#include <arrow/memory_pool.h>
25#include <arrow/record_batch.h>
26#include <arrow/table.h>
27#include <arrow/type_traits.h>
28#include <arrow/status.h>
29#if defined(__GNUC__)
30#pragma GCC diagnostic pop
31#endif
32
33namespace o2::framework
34{
35
36InputRecord::InputRecord(std::vector<InputRoute> const& inputsSchema,
37 InputSpan& span,
38 ServiceRegistryRef registry)
39 : mRegistry{registry},
40 mInputsSchema{inputsSchema},
41 mSpan{span}
42{
43}
44
45int InputRecord::getPos(const char* binding) const
46{
47 auto inputIndex = 0;
48 for (size_t i = 0; i < mInputsSchema.size(); ++i) {
49 auto& route = mInputsSchema[i];
50 if (route.timeslice != 0) {
51 continue;
52 }
53 if (route.matcher.binding == binding) {
54 return inputIndex;
55 }
56 ++inputIndex;
57 }
58 return -1;
59}
60
61InputRecord::InputPos InputRecord::getPos(std::vector<InputRoute> const& schema, ConcreteDataMatcher concrete)
62{
63 size_t inputIndex = 0;
64 for (const auto& route : schema) {
65 if (route.timeslice != 0) {
66 continue;
67 }
68 if (DataSpecUtils::match(route.matcher, concrete)) {
69 return {inputIndex};
70 }
71 ++inputIndex;
72 }
74}
75
76int InputRecord::getPos(std::string const& binding) const
77{
78 return this->getPos(binding.c_str());
79}
80
81DataRef InputRecord::getByPos(int pos, int part) const
82{
83 return InputRecord::getByPos(mInputsSchema, mSpan, pos, part);
84}
85
86DataRef InputRecord::getByPos(std::vector<InputRoute> const& schema, InputSpan const& span, int pos, int part)
87{
88 if (pos >= (int)span.size() || pos < 0) {
89 throw runtime_error_f("Unknown message requested at position %d", pos);
90 }
91 if (part > 0 && part >= (int)span.getNofParts(pos)) {
92 throw runtime_error_f("Invalid message part index at %d:%d", pos, part);
93 }
94 if (pos >= (int)schema.size()) {
95 throw runtime_error_f("Unknown schema at position %d", pos);
96 }
97 auto ref = span.get(pos, part);
98 auto inputIndex = 0;
99 auto schemaIndex = 0;
100 for (size_t i = 0; i < schema.size(); ++i) {
101 schemaIndex = i;
102 auto& route = schema[i];
103 if (route.timeslice != 0) {
104 continue;
105 }
106 if (inputIndex == pos) {
107 break;
108 }
109 ++inputIndex;
110 }
111 ref.spec = &schema[schemaIndex].matcher;
112 return ref;
113}
114
115DataRef InputRecord::getFirstValid(bool throwOnFailure) const
116{
117 for (size_t i = 0; i < size(); i++) {
118 auto ref = mSpan.get(i);
119 if (ref.header != nullptr) {
120 ref.spec = &mInputsSchema[i].matcher;
121 return ref;
122 }
123 }
124 if (throwOnFailure) {
125 throw runtime_error_f("No valid input found out of total ", size());
126 }
127 return {};
128}
129
131{
132 if (pos < 0 || pos >= mSpan.size()) {
133 return 0;
134 }
135 return mSpan.getNofParts(pos);
136}
137size_t InputRecord::size() const
138{
139 return mSpan.size();
140}
141
142bool InputRecord::isValid(char const* s) const
143{
144 DataRef ref = get(s);
145 if (ref.header == nullptr) {
146 return false;
147 }
148 return true;
149}
150
151bool InputRecord::isValid(int s) const
152{
153 if (s >= size()) {
154 return false;
155 }
156 DataRef ref = getByPos(s);
157 if (ref.header == nullptr) {
158 return false;
159 }
160 return true;
161}
162
164{
165 size_t count = 0;
166 for (auto&& _ : *this) {
167 ++count;
168 }
169 return count;
170}
171
172[[nodiscard]] std::string InputRecord::describeAvailableInputs() const
173{
174 std::stringstream ss;
175 ss << "Available inputs: ";
176 bool first = true;
177 for (auto const& route : mInputsSchema) {
178 if (!first) {
179 ss << ", ";
180 }
181 ss << route.matcher.binding;
182 first = false;
183 }
184 return ss.str();
185}
186
187} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
int getPos(const char *name) const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
InputRecord(std::vector< InputRoute > const &inputs, InputSpan &span, ServiceRegistryRef)
size_t countValidInputs() const
static DataRef getByPos(std::vector< InputRoute > const &routes, InputSpan const &span, int pos, int part=0)
size_t getNofParts(int pos) const
DataRef getFirstValid(bool throwOnFailure=false) const
Get the ref of the first valid input. If requested, throw an error if none is found.
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:70
size_t getNofParts(size_t i) const
number of parts in the i-th element of the InputSpan
Definition InputSpan.h:58
DataRef get(size_t i, size_t partidx=0) const
i-th element of the InputSpan
Definition InputSpan.h:52
GLint GLsizei count
Definition glcorearb.h:399
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error_f(const char *,...)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static constexpr size_t INVALID