Project
Loading...
Searching...
No Matches
ArrowTableSlicingCache.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/table.h>
18
19namespace o2::framework
20{
21
22namespace
23{
24std::shared_ptr<arrow::ChunkedArray> GetColumnByNameCI(std::shared_ptr<arrow::Table> const& table, std::string const& key)
25{
26 auto const& fields = table->schema()->fields();
27 auto target = std::find_if(fields.begin(), fields.end(), [&key](std::shared_ptr<arrow::Field> const& field) {
28 return [](std::string_view const& s1, std::string_view const& s2) {
29 return std::ranges::equal(
30 s1, s2,
31 [](char c1, char c2) {
32 return std::tolower(static_cast<unsigned char>(c1)) == std::tolower(static_cast<unsigned char>(c2));
33 });
34 }(field->name(), key);
35 });
36 return table->column(std::distance(fields.begin(), target));
37}
38} // namespace
39
40void updatePairList(Cache& list, std::string const& binding, std::string const& key, bool enabled = true)
41{
42 auto locate = std::find_if(list.begin(), list.end(), [&binding, &key](auto const& entry) { return (entry.binding == binding) && (entry.key == key); });
43 if (locate == list.end()) {
44 list.emplace_back(binding, key, enabled);
45 } else if (!locate->enabled && enabled) {
46 locate->enabled = true;
47 }
48}
49
50std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
51{
52 if ((size_t)value >= offsets.size()) {
53 return {0, 0};
54 }
55
56 return {offsets[value], sizes[value]};
57}
58
59std::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(int value) const
60{
61 if (values.empty()) {
62 return {};
63 }
64 if (value > values[values.size() - 1]) {
65 return {};
66 }
67
68 return {(*groups)[value].data(), (*groups)[value].size()};
69}
70
71void ArrowTableSlicingCacheDef::setCaches(Cache&& bsks)
72{
73 bindingsKeys = bsks;
74}
75
76void ArrowTableSlicingCacheDef::setCachesUnsorted(Cache&& bsks)
77{
78 bindingsKeysUnsorted = bsks;
79}
80
81ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted)
82 : bindingsKeys{bsks},
83 bindingsKeysUnsorted{bsksUnsorted}
84{
85 offsets.resize(bindingsKeys.size());
86 sizes.resize(bindingsKeys.size());
87
89 groups.resize(bindingsKeysUnsorted.size());
90}
91
92void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
93{
94 bindingsKeys = bsks;
95 bindingsKeysUnsorted = bsksUnsorted;
96 offsets.clear();
97 offsets.resize(bindingsKeys.size());
98 sizes.clear();
99 sizes.resize(bindingsKeys.size());
100 valuesUnsorted.clear();
102 groups.clear();
103 groups.resize(bindingsKeysUnsorted.size());
104}
105
106arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
107{
108 offsets[pos].clear();
109 sizes[pos].clear();
110 if (table->num_rows() == 0) {
111 return arrow::Status::OK();
112 }
113 auto& [b, k, e] = bindingsKeys[pos];
114 if (!e) {
115 throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str());
116 }
118
119 int maxValue = -1;
120 auto column = GetColumnByNameCI(table, k);
121
122 // starting from the end, find the first positive value, in a sorted column it is the largest index
123 for (auto iChunk = column->num_chunks() - 1; iChunk >= 0; --iChunk) {
124 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
125 for (auto iElement = chunk.length() - 1; iElement >= 0; --iElement) {
126 auto value = chunk.Value(iElement);
127 if (value < 0) {
128 continue;
129 } else {
130 maxValue = value;
131 break;
132 }
133 }
134 if (maxValue >= 0) {
135 break;
136 }
137 }
138
139 offsets[pos].resize(maxValue + 1);
140 sizes[pos].resize(maxValue + 1);
141
142 // loop over the index and collect size/offset
143 int lastValue = std::numeric_limits<int>::max();
144 int globalRow = 0;
145 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
146 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
147 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
148 auto v = chunk.Value(iElement);
149 if (v >= 0) {
150 if (v == lastValue) {
151 ++sizes[pos][v];
152 } else {
153 lastValue = v;
154 ++sizes[pos][v];
155 offsets[pos][v] = globalRow;
156 }
157 }
158 ++globalRow;
159 }
160 }
161
162 return arrow::Status::OK();
163}
164
165arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const std::shared_ptr<arrow::Table>& table)
166{
167 valuesUnsorted[pos].clear();
168 groups[pos].clear();
169 if (table->num_rows() == 0) {
170 return arrow::Status::OK();
171 }
172 auto& [b, k, e] = bindingsKeysUnsorted[pos];
173 if (!e) {
174 throw runtime_error_f("Disabled unsorted cache %s/%s update requested", b.c_str(), k.c_str());
175 }
176 auto column = GetColumnByNameCI(table, k);
177 auto row = 0;
178 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
179 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
180 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
181 auto v = chunk.Value(iElement);
182 if (v >= 0) {
183 if (std::find(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end(), v) == valuesUnsorted[pos].end()) {
184 valuesUnsorted[pos].push_back(v);
185 }
186 if ((int)groups[pos].size() <= v) {
187 groups[pos].resize(v + 1);
188 }
189 (groups[pos])[v].push_back(row);
190 }
191 ++row;
192 }
193 }
194 std::sort(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end());
195 return arrow::Status::OK();
196}
197
198std::pair<int, bool> ArrowTableSlicingCache::getCachePos(const Entry& bindingKey) const
199{
200 auto pos = getCachePosSortedFor(bindingKey);
201 if (pos != -1) {
202 return {pos, true};
203 }
204 pos = getCachePosUnsortedFor(bindingKey);
205 if (pos != -1) {
206 return {pos, false};
207 }
208 throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
209}
210
212{
213 auto locate = std::find_if(bindingsKeys.begin(), bindingsKeys.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
214 if (locate != bindingsKeys.end()) {
215 return std::distance(bindingsKeys.begin(), locate);
216 }
217 return -1;
218}
219
221{
222 auto locate_unsorted = std::find_if(bindingsKeysUnsorted.begin(), bindingsKeysUnsorted.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
223 if (locate_unsorted != bindingsKeysUnsorted.end()) {
224 return std::distance(bindingsKeysUnsorted.begin(), locate_unsorted);
225 }
226 return -1;
227}
229{
230 auto [p, s] = getCachePos(bindingKey);
231 if (!s) {
232 throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
233 }
234 if (!bindingsKeys[p].enabled) {
235 throw runtime_error_f("Disabled cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
236 }
237
238 return getCacheForPos(p);
239}
240
242{
243 auto [p, s] = getCachePos(bindingKey);
244 if (s) {
245 throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
246 }
248 throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
249 }
250
251 return getCacheUnsortedForPos(p);
252}
253
255{
256 return {
257 gsl::span{offsets[pos].data(), offsets[pos].size()}, //
258 gsl::span(sizes[pos].data(), sizes[pos].size()) //
259 };
260}
261
263{
264 return {
265 {reinterpret_cast<int const*>(valuesUnsorted[pos].data()), valuesUnsorted[pos].size()},
266 &(groups[pos]) //
267 };
268}
269
270void ArrowTableSlicingCache::validateOrder(Entry const& bindingKey, const std::shared_ptr<arrow::Table>& input)
271{
272 auto const& [target, key, enabled] = bindingKey;
273 auto column = o2::framework::GetColumnByNameCI(input, key);
274 auto array0 = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(0)->data());
275 int32_t prev = 0;
276 int32_t cur = array0.Value(0);
277 int32_t lastNeg = cur < 0 ? cur : 0;
278 int32_t lastPos = cur < 0 ? -1 : cur;
279 for (auto i = 0; i < column->num_chunks(); ++i) {
280 auto array = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(i)->data());
281 for (auto e = 0; e < array.length(); ++e) {
282 prev = cur;
283 if (prev >= 0) {
284 lastPos = prev;
285 } else {
286 lastNeg = prev;
287 }
288 cur = array.Value(e);
289 if (cur >= 0) {
290 if (lastPos > cur) {
291 throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target.c_str(), key.c_str(), cur, lastPos);
292 }
293 if (lastPos == cur && prev < 0) {
294 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
295 }
296 } else {
297 if (lastNeg < cur) {
298 throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target.c_str(), key.c_str(), cur, lastNeg);
299 }
300 if (lastNeg == cur && prev >= 0) {
301 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
302 }
303 }
304 }
305 }
306}
307} // namespace o2::framework
std::string binding
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
uint16_t pos
Definition RawData.h:3
StringRef key
GLsizei GLuint * groups
Definition glcorearb.h:3984
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
RuntimeErrorRef runtime_error_f(const char *,...)
Definition list.h:40
SliceInfoUnsortedPtr getCacheUnsortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
std::pair< int, bool > getCachePos(Entry const &bindingKey) const
SliceInfoPtr getCacheFor(Entry const &bindingKey) const
void setCaches(Cache &&bsks, Cache &&bsksUnsorted={})
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
int getCachePosUnsortedFor(Entry const &bindingKey) const
std::vector< std::vector< int > > valuesUnsorted
static void validateOrder(Entry const &bindingKey, std::shared_ptr< arrow::Table > const &input)
std::vector< int > row