16#include <fairmq/Message.h>
20#pragma GCC diagnostic push
21#pragma GCC diagnostic ignored "-Wshadow"
23#include <arrow/builder.h>
24#include <arrow/memory_pool.h>
25#include <arrow/record_batch.h>
26#include <arrow/table.h>
27#include <arrow/type_traits.h>
28#include <arrow/status.h>
30#pragma GCC diagnostic pop
39 : mRegistry{registry},
40 mInputsSchema{inputsSchema},
48 for (
size_t i = 0;
i < mInputsSchema.size(); ++
i) {
49 auto& route = mInputsSchema[
i];
50 if (route.timeslice != 0) {
53 if (route.matcher.binding == binding) {
63 size_t inputIndex = 0;
64 for (
const auto& route : schema) {
65 if (route.timeslice != 0) {
78 return this->
getPos(binding.c_str());
94 if (
pos >= (
int)schema.size()) {
100 for (
size_t i = 0;
i < schema.size(); ++
i) {
102 auto& route = schema[
i];
103 if (route.timeslice != 0) {
106 if (inputIndex ==
pos) {
111 ref.spec = &schema[schemaIndex].matcher;
117 for (
size_t i = 0;
i <
size();
i++) {
119 if (
ref.header !=
nullptr) {
120 ref.spec = &mInputsSchema[
i].matcher;
124 if (throwOnFailure) {
132 if (pos < 0 || pos >= mSpan.
size()) {
145 if (
ref.header ==
nullptr) {
157 if (
ref.header ==
nullptr) {
166 for (
auto&& _ : *
this) {
172[[nodiscard]] std::string InputRecord::describeAvailableInputs()
const
174 std::stringstream ss;
175 ss <<
"Available inputs: ";
177 for (
auto const& route : mInputsSchema) {
181 ss << route.matcher.binding;
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error_f(const char *,...)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)