15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/table.h>
24 if (std::find_if(
list.begin(),
list.end(), [&binding, &
key](
auto const&
entry) { return (entry.first == binding) && (entry.second == key); }) ==
list.end()) {
25 list.emplace_back(binding,
key);
35 int64_t p =
static_cast<int64_t
>(
values.size()) - 1;
47 for (
auto i = 0U;
i <
values.size(); ++
i) {
65 return {(*groups)[
value].data(), (*groups)[
value].size()};
80 bindingsKeysUnsorted{bsksUnsorted}
105 if (table->num_rows() == 0) {
108 return arrow::Status::OK();
111 arrow::Datum value_counts;
112 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
113 ARROW_ASSIGN_OR_RAISE(value_counts,
114 arrow::compute::CallFunction(
"value_counts", {table->GetColumnByName(
bindingsKeys[
pos].second)},
116 auto pair =
static_cast<arrow::StructArray
>(value_counts.array());
119 values[
pos] = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
120 counts[
pos] = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
121 return arrow::Status::OK();
128 if (table->num_rows() == 0) {
129 return arrow::Status::OK();
132 auto column = table->GetColumnByName(k);
134 for (
auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
135 auto chunk =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(iChunk)->data());
136 for (
auto iElement = 0; iElement < chunk.length(); ++iElement) {
137 auto v = chunk.Value(iElement);
151 return arrow::Status::OK();
164 throw runtime_error_f(
"%s/%s not found neither in sorted or unsorted cache", bindingKey.first.c_str(), bindingKey.second.c_str());
169 auto locate = std::find_if(
bindingsKeys.begin(),
bindingsKeys.end(), [&](
StringPair const& bk) { return (bindingKey.first == bk.first) && (bindingKey.second == bk.second); });
188 throw runtime_error_f(
"%s/%s is found in unsorted cache", bindingKey.first.c_str(), bindingKey.second.c_str());
198 throw runtime_error_f(
"%s/%s is found in sorted cache", bindingKey.first.c_str(), bindingKey.second.c_str());
230 auto column = input->GetColumnByName(
key);
231 auto array0 =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(0)->data());
233 int32_t cur = array0.Value(0);
234 int32_t lastNeg = cur < 0 ? cur : 0;
235 int32_t lastPos = cur < 0 ? -1 : cur;
236 for (
auto i = 0;
i < column->num_chunks(); ++
i) {
237 auto array =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(
i)->data());
238 for (
auto e = 0; e <
array.length(); ++e) {
245 cur =
array.Value(e);
248 throw runtime_error_f(
"Table %s index %s is not sorted: next value %d < previous value %d!",
target.c_str(),
key.c_str(), cur, lastPos);
250 if (lastPos == cur && prev < 0) {
251 throw runtime_error_f(
"Table %s index %s has a group with index %d that is split by %d",
target.c_str(),
key.c_str(), cur, prev);
255 throw runtime_error_f(
"Table %s index %s is not sorted: next negative value %d > previous negative value %d!",
target.c_str(),
key.c_str(), cur, lastNeg);
257 if (lastNeg == cur && prev >= 0) {
258 throw runtime_error_f(
"Table %s index %s has a group with index %d that is split by %d",
target.c_str(),
key.c_str(), cur, prev);
GLboolean GLboolean GLboolean b
GLsizei const GLfloat * value
GLenum GLsizei GLsizei GLint * values
GLuint GLsizei GLsizei * length
Defining PrimaryVertex explicitly as messageable.
void updatePairList(std::vector< StringPair > &list, std::string const &binding, std::string const &key)
std::pair< std::string, std::string > StringPair
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< StringPair > bindingsKeysUnsorted
std::vector< StringPair > bindingsKeys
void setCaches(std::vector< StringPair > &&bsks)
void setCachesUnsorted(std::vector< StringPair > &&bsks)
std::pair< int, bool > getCachePos(StringPair const &bindingKey) const
std::vector< StringPair > bindingsKeys
SliceInfoPtr getCacheForPos(int pos) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
std::vector< StringPair > bindingsKeysUnsorted
void setCaches(std::vector< StringPair > &&bsks, std::vector< StringPair > &&bsksUnsorted={})
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(StringPair const &bindingKey) const
int getCachePosUnsortedFor(StringPair const &bindingKey) const
SliceInfoPtr getCacheFor(StringPair const &bindingKey) const
ArrowTableSlicingCache(std::vector< StringPair > &&bsks, std::vector< StringPair > &&bsksUnsorted={})
SliceInfoUnsortedPtr getCacheUnsortedFor(StringPair const &bindingKey) const
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
static void validateOrder(StringPair const &bindingKey, std::shared_ptr< arrow::Table > const &input)
std::vector< std::vector< int > > valuesUnsorted
std::vector< std::shared_ptr< arrow::NumericArray< arrow::Int64Type > > > counts
gsl::span< int64_t const > counts
std::pair< int64_t, int64_t > getSliceFor(int value) const
gsl::span< int64_t const > getSliceFor(int value) const