Project
Loading...
Searching...
No Matches
ArrowTableSlicingCache.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#include <arrow/compute/api_aggregate.h>
17#include <arrow/compute/kernel.h>
18#include <arrow/table.h>
19
20namespace o2::framework
21{
22
23namespace
24{
25std::shared_ptr<arrow::ChunkedArray> GetColumnByNameCI(std::shared_ptr<arrow::Table> const& table, std::string const& key)
26{
27 auto const& fields = table->schema()->fields();
28 auto target = std::find_if(fields.begin(), fields.end(), [&key](std::shared_ptr<arrow::Field> const& field) {
29 return [](std::string_view const& s1, std::string_view const& s2) {
30 return std::ranges::equal(
31 s1, s2,
32 [](char c1, char c2) {
33 return std::tolower(static_cast<unsigned char>(c1)) == std::tolower(static_cast<unsigned char>(c2));
34 });
35 }(field->name(), key);
36 });
37 return table->column(std::distance(fields.begin(), target));
38}
39} // namespace
40
42{
43 auto locate = std::find(list.begin(), list.end(), entry);
44 if (locate == list.end()) {
45 list.emplace_back(entry);
46 } else if (!locate->enabled && entry.enabled) {
47 locate->enabled = true;
48 }
49}
50
51std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
52{
53 if ((size_t)value >= offsets.size()) {
54 return {0, 0};
55 }
56
57 return {offsets[value], sizes[value]};
58}
59
60std::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(int value) const
61{
62 if (values.empty()) {
63 return {};
64 }
65 if (value > values[values.size() - 1]) {
66 return {};
67 }
68
69 return {(*groups)[value].data(), (*groups)[value].size()};
70}
71
72void ArrowTableSlicingCacheDef::setCaches(Cache&& bsks)
73{
74 bindingsKeys = bsks;
75}
76
77void ArrowTableSlicingCacheDef::setCachesUnsorted(Cache&& bsks)
78{
79 bindingsKeysUnsorted = bsks;
80}
81
82ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted, header::DataOrigin newOrigin_)
83 : bindingsKeys{bsks},
84 bindingsKeysUnsorted{bsksUnsorted},
85 newOrigin{newOrigin_}
86{
87 offsets.resize(bindingsKeys.size());
88 sizes.resize(bindingsKeys.size());
89
91 groups.resize(bindingsKeysUnsorted.size());
92}
93
94void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
95{
96 bindingsKeys = bsks;
97 bindingsKeysUnsorted = bsksUnsorted;
98 offsets.clear();
99 offsets.resize(bindingsKeys.size());
100 sizes.clear();
101 sizes.resize(bindingsKeys.size());
102 valuesUnsorted.clear();
104 groups.clear();
105 groups.resize(bindingsKeysUnsorted.size());
106}
107
108arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
109{
110 offsets[pos].clear();
111 sizes[pos].clear();
112 if (table->num_rows() == 0) {
113 return arrow::Status::OK();
114 }
115 auto& [b, m, k, e] = bindingsKeys[pos];
116 if (!e) {
117 throw runtime_error_f("Disabled cache (%s) %s/%s update requested", DataSpecUtils::describe(m).c_str(), b.c_str(), k.c_str());
118 }
120
121 int maxValue = -1;
122 auto column = GetColumnByNameCI(table, k);
123
124 // starting from the end, find the first positive value, in a sorted column it is the largest index
125 for (auto iChunk = column->num_chunks() - 1; iChunk >= 0; --iChunk) {
126 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
127 for (auto iElement = chunk.length() - 1; iElement >= 0; --iElement) {
128 auto value = chunk.Value(iElement);
129 if (value < 0) {
130 continue;
131 } else {
132 maxValue = value;
133 break;
134 }
135 }
136 if (maxValue >= 0) {
137 break;
138 }
139 }
140
141 offsets[pos].resize(maxValue + 1);
142 sizes[pos].resize(maxValue + 1);
143
144 // loop over the index and collect size/offset
145 int lastValue = std::numeric_limits<int>::max();
146 int globalRow = 0;
147 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
148 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
149 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
150 auto v = chunk.Value(iElement);
151 if (v >= 0) {
152 if (v == lastValue) {
153 ++sizes[pos][v];
154 } else {
155 lastValue = v;
156 ++sizes[pos][v];
157 offsets[pos][v] = globalRow;
158 }
159 }
160 ++globalRow;
161 }
162 }
163
164 return arrow::Status::OK();
165}
166
167arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const std::shared_ptr<arrow::Table>& table)
168{
169 valuesUnsorted[pos].clear();
170 groups[pos].clear();
171 if (table->num_rows() == 0) {
172 return arrow::Status::OK();
173 }
174 auto& [b, m, k, e] = bindingsKeysUnsorted[pos];
175 if (!e) {
176 throw runtime_error_f("Disabled unsorted cache %s/%s update requested", b.c_str(), k.c_str());
177 }
178 auto column = GetColumnByNameCI(table, k);
179 auto row = 0;
180 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
181 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
182 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
183 auto v = chunk.Value(iElement);
184 if (v >= 0) {
185 if (std::find(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end(), v) == valuesUnsorted[pos].end()) {
186 valuesUnsorted[pos].push_back(v);
187 }
188 if ((int)groups[pos].size() <= v) {
189 groups[pos].resize(v + 1);
190 }
191 (groups[pos])[v].push_back(row);
192 }
193 ++row;
194 }
195 }
196 std::sort(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end());
197 return arrow::Status::OK();
198}
199
200std::pair<int, bool> ArrowTableSlicingCache::getCachePos(const Entry& bindingKey) const
201{
202 auto pos = getCachePosSortedFor(bindingKey);
203 if (pos != -1) {
204 return {pos, true};
205 }
206 pos = getCachePosUnsortedFor(bindingKey);
207 if (pos != -1) {
208 return {pos, false};
209 }
210 throw runtime_error_f("(%s) %s/%s not found neither in sorted or unsorted cache", DataSpecUtils::describe(bindingKey.matcher).c_str(), bindingKey.binding.c_str(), bindingKey.key.c_str());
211}
212
214{
215 auto locate = std::ranges::find(bindingsKeys, bindingKey);
216 if (locate != bindingsKeys.end()) {
217 return std::distance(bindingsKeys.begin(), locate);
218 }
219 return -1;
220}
221
223{
224 auto locate_unsorted = std::ranges::find(bindingsKeysUnsorted, bindingKey);
225 if (locate_unsorted != bindingsKeysUnsorted.end()) {
226 return std::distance(bindingsKeysUnsorted.begin(), locate_unsorted);
227 }
228 return -1;
229}
231{
232 auto [p, s] = getCachePos(bindingKey);
233 if (!s) {
234 throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
235 }
236 if (!bindingsKeys[p].enabled) {
237 throw runtime_error_f("Disabled cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
238 }
239
240 return getCacheForPos(p);
241}
242
244{
245 auto [p, s] = getCachePos(bindingKey);
246 if (s) {
247 throw runtime_error_f("(%s) %s/%s is found in sorted cache", DataSpecUtils::describe(bindingKey.matcher).c_str(), bindingKey.binding.c_str(), bindingKey.key.c_str());
248 }
250 throw runtime_error_f("Disabled unsorted cache (%s) %s/%s is requested", DataSpecUtils::describe(bindingKey.matcher).c_str(), bindingKey.binding.c_str(), bindingKey.key.c_str());
251 }
252
253 return getCacheUnsortedForPos(p);
254}
255
257{
258 return {
259 gsl::span{offsets[pos].data(), offsets[pos].size()}, //
260 gsl::span(sizes[pos].data(), sizes[pos].size()) //
261 };
262}
263
265{
266 return {
267 {reinterpret_cast<int const*>(valuesUnsorted[pos].data()), valuesUnsorted[pos].size()},
268 &(groups[pos]) //
269 };
270}
271
272void ArrowTableSlicingCache::validateOrder(Entry const& bindingKey, const std::shared_ptr<arrow::Table>& input)
273{
274 auto const& [target, matcher, key, enabled] = bindingKey;
275 if (!enabled) {
276 return;
277 }
278 auto column = o2::framework::GetColumnByNameCI(input, key);
279 auto array0 = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(0)->data());
280 int32_t prev;
281 int32_t cur = array0.Value(0);
282 int32_t lastNeg = cur < 0 ? cur : 0;
283 int32_t lastPos = cur < 0 ? -1 : cur;
284 for (auto i = 0; i < column->num_chunks(); ++i) {
285 auto array = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(i)->data());
286 for (auto e = 0; e < array.length(); ++e) {
287 prev = cur;
288 if (prev >= 0) {
289 lastPos = prev;
290 } else {
291 lastNeg = prev;
292 }
293 cur = array.Value(e);
294 if (cur >= 0) {
295 if (lastPos > cur) {
296 throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target.c_str(), key.c_str(), cur, lastPos);
297 }
298 if (lastPos == cur && prev < 0) {
299 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
300 }
301 } else {
302 if (lastNeg < cur) {
303 throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target.c_str(), key.c_str(), cur, lastNeg);
304 }
305 if (lastNeg == cur && prev >= 0) {
306 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
307 }
308 }
309 }
310 }
311}
312} // namespace o2::framework
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
uint16_t pos
Definition RawData.h:3
StringRef key
const GLfloat * m
Definition glcorearb.h:4066
GLsizei GLuint * groups
Definition glcorearb.h:3984
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLboolean * data
Definition glcorearb.h:298
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
void updatePairList(Cache &list, Entry &entry)
std::vector< Entry > Cache
RuntimeErrorRef runtime_error_f(const char *,...)
SliceInfoUnsortedPtr getCacheUnsortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
std::pair< int, bool > getCachePos(Entry const &bindingKey) const
SliceInfoPtr getCacheFor(Entry const &bindingKey) const
void setCaches(Cache &&bsks, Cache &&bsksUnsorted={})
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
int getCachePosUnsortedFor(Entry const &bindingKey) const
std::vector< std::vector< int > > valuesUnsorted
static void validateOrder(Entry const &bindingKey, std::shared_ptr< arrow::Table > const &input)
static std::string describe(InputSpec const &spec)
ConcreteDataMatcher matcher
std::vector< int > row