15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/table.h>
24std::shared_ptr<arrow::ChunkedArray> GetColumnByNameCI(std::shared_ptr<arrow::Table>
const& table, std::string
const&
key)
26 auto const&
fields = table->schema()->fields();
27 auto target = std::find_if(
fields.begin(),
fields.end(), [&
key](std::shared_ptr<arrow::Field>
const& field) {
28 return [](std::string_view const& s1, std::string_view const& s2) {
29 return std::ranges::equal(
31 [](char c1, char c2) {
32 return std::tolower(static_cast<unsigned char>(c1)) == std::tolower(static_cast<unsigned char>(c2));
34 }(field->name(),
key);
36 return table->column(std::distance(
fields.begin(),
target));
42 auto locate = std::find_if(
list.begin(),
list.end(), [&
binding, &
key](
auto const&
entry) { return (entry.binding == binding) && (entry.key == key); });
43 if (locate ==
list.end()) {
45 }
else if (!locate->enabled &&
enabled) {
46 locate->enabled =
true;
50std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(
int value)
const
59std::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(
int value)
const
68 return {(*groups)[
value].data(), (*groups)[
value].size()};
71void ArrowTableSlicingCacheDef::setCaches(
Cache&& bsks)
76void ArrowTableSlicingCacheDef::setCachesUnsorted(
Cache&& bsks)
78 bindingsKeysUnsorted = bsks;
81ArrowTableSlicingCache::ArrowTableSlicingCache(
Cache&& bsks,
Cache&& bsksUnsorted)
83 bindingsKeysUnsorted{bsksUnsorted}
110 if (table->num_rows() == 0) {
111 return arrow::Status::OK();
115 throw runtime_error_f(
"Disabled cache %s/%s update requested",
b.c_str(), k.c_str());
120 auto column = GetColumnByNameCI(table, k);
123 for (
auto iChunk = column->num_chunks() - 1; iChunk >= 0; --iChunk) {
124 auto chunk =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(iChunk)->data());
125 for (
auto iElement = chunk.length() - 1; iElement >= 0; --iElement) {
126 auto value = chunk.Value(iElement);
143 int lastValue = std::numeric_limits<int>::max();
145 for (
auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
146 auto chunk =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(iChunk)->data());
147 for (
auto iElement = 0; iElement < chunk.length(); ++iElement) {
148 auto v = chunk.Value(iElement);
150 if (
v == lastValue) {
162 return arrow::Status::OK();
169 if (table->num_rows() == 0) {
170 return arrow::Status::OK();
174 throw runtime_error_f(
"Disabled unsorted cache %s/%s update requested",
b.c_str(), k.c_str());
176 auto column = GetColumnByNameCI(table, k);
178 for (
auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
179 auto chunk =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(iChunk)->data());
180 for (
auto iElement = 0; iElement < chunk.length(); ++iElement) {
181 auto v = chunk.Value(iElement);
195 return arrow::Status::OK();
208 throw runtime_error_f(
"%s/%s not found neither in sorted or unsorted cache", bindingKey.
binding.c_str(), bindingKey.
key.c_str());
213 auto locate = std::find_if(
bindingsKeys.begin(),
bindingsKeys.end(), [&](
Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
273 auto column = o2::framework::GetColumnByNameCI(input,
key);
274 auto array0 =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(0)->data());
276 int32_t cur = array0.Value(0);
277 int32_t lastNeg = cur < 0 ? cur : 0;
278 int32_t lastPos = cur < 0 ? -1 : cur;
279 for (
auto i = 0;
i < column->num_chunks(); ++
i) {
280 auto array =
static_cast<arrow::NumericArray<arrow::Int32Type>
>(column->chunk(
i)->data());
281 for (
auto e = 0; e <
array.length(); ++e) {
288 cur =
array.Value(e);
291 throw runtime_error_f(
"Table %s index %s is not sorted: next value %d < previous value %d!",
target.c_str(),
key.c_str(), cur, lastPos);
293 if (lastPos == cur && prev < 0) {
294 throw runtime_error_f(
"Table %s index %s has a group with index %d that is split by %d",
target.c_str(),
key.c_str(), cur, prev);
298 throw runtime_error_f(
"Table %s index %s is not sorted: next negative value %d > previous negative value %d!",
target.c_str(),
key.c_str(), cur, lastNeg);
300 if (lastNeg == cur && prev >= 0) {
301 throw runtime_error_f(
"Table %s index %s has a group with index %d that is split by %d",
target.c_str(),
key.c_str(), cur, prev);
std::vector< std::shared_ptr< arrow::Field > > fields
GLuint GLsizei const GLuint const GLintptr * offsets
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
GLenum GLenum GLsizei const GLuint GLboolean enabled
GLboolean GLboolean GLboolean b
GLsizei const GLfloat * value
GLenum GLsizei GLsizei GLint * values
Defining PrimaryVertex explicitly as messageable.
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
RuntimeErrorRef runtime_error_f(const char *,...)
SliceInfoPtr getCacheForPos(int pos) const
SliceInfoUnsortedPtr getCacheUnsortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
std::pair< int, bool > getCachePos(Entry const &bindingKey) const
SliceInfoPtr getCacheFor(Entry const &bindingKey) const
void setCaches(Cache &&bsks, Cache &&bsksUnsorted={})
Cache bindingsKeysUnsorted
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
int getCachePosUnsortedFor(Entry const &bindingKey) const
std::vector< std::vector< int > > valuesUnsorted
static void validateOrder(Entry const &bindingKey, std::shared_ptr< arrow::Table > const &input)