Project
Loading...
Searching...
No Matches
ArrowTableSlicingCache.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/table.h>
18
19namespace o2::framework
20{
21
22void updatePairList(Cache& list, std::string const& binding, std::string const& key, bool enabled = true)
23{
24 auto locate = std::find_if(list.begin(), list.end(), [&binding, &key](auto const& entry) { return (entry.binding == binding) && (entry.key == key); });
25 if (locate == list.end()) {
26 list.emplace_back(binding, key, enabled);
27 } else if (!locate->enabled && enabled) {
28 locate->enabled = true;
29 }
30}
31
32std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
33{
34 if ((size_t)value >= offsets.size()) {
35 return {0, 0};
36 }
37
38 return {offsets[value], sizes[value]};
39}
40
41std::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(int value) const
42{
43 if (values.empty()) {
44 return {};
45 }
46 if (value > values[values.size() - 1]) {
47 return {};
48 }
49
50 return {(*groups)[value].data(), (*groups)[value].size()};
51}
52
57
62
64 : bindingsKeys{bsks},
65 bindingsKeysUnsorted{bsksUnsorted}
66{
67 offsets.resize(bindingsKeys.size());
68 sizes.resize(bindingsKeys.size());
69
71 groups.resize(bindingsKeysUnsorted.size());
72}
73
74void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
75{
76 bindingsKeys = bsks;
77 bindingsKeysUnsorted = bsksUnsorted;
78 offsets.clear();
79 offsets.resize(bindingsKeys.size());
80 sizes.clear();
81 sizes.resize(bindingsKeys.size());
82 valuesUnsorted.clear();
84 groups.clear();
85 groups.resize(bindingsKeysUnsorted.size());
86}
87
88arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
89{
90 offsets[pos].clear();
91 sizes[pos].clear();
92 if (table->num_rows() == 0) {
93 return arrow::Status::OK();
94 }
95 auto& [b, k, e] = bindingsKeys[pos];
96 if (!e) {
97 throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str());
98 }
100
101 int maxValue = -1;
102 auto column = table->GetColumnByName(k);
103
104 // starting from the end, find the first positive value, in a sorted column it is the largest index
105 for (auto iChunk = column->num_chunks() - 1; iChunk >= 0; --iChunk) {
106 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
107 for (auto iElement = chunk.length() - 1; iElement >= 0; --iElement) {
108 auto value = chunk.Value(iElement);
109 if (value < 0) {
110 continue;
111 } else {
112 maxValue = value;
113 break;
114 }
115 }
116 if (maxValue >= 0) {
117 break;
118 }
119 }
120
121 offsets[pos].resize(maxValue + 1);
122 sizes[pos].resize(maxValue + 1);
123
124 // loop over the index and collect size/offset
125 int lastValue = std::numeric_limits<int>::max();
126 int globalRow = 0;
127 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
128 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
129 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
130 auto v = chunk.Value(iElement);
131 if (v >= 0) {
132 if (v == lastValue) {
133 ++sizes[pos][v];
134 } else {
135 lastValue = v;
136 ++sizes[pos][v];
137 offsets[pos][v] = globalRow;
138 }
139 }
140 ++globalRow;
141 }
142 }
143
144 return arrow::Status::OK();
145}
146
147arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const std::shared_ptr<arrow::Table>& table)
148{
149 valuesUnsorted[pos].clear();
150 groups[pos].clear();
151 if (table->num_rows() == 0) {
152 return arrow::Status::OK();
153 }
154 auto& [b, k, e] = bindingsKeysUnsorted[pos];
155 if (!e) {
156 throw runtime_error_f("Disabled unsorted cache %s/%s update requested", b.c_str(), k.c_str());
157 }
158 auto column = table->GetColumnByName(k);
159 auto row = 0;
160 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
161 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
162 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
163 auto v = chunk.Value(iElement);
164 if (v >= 0) {
165 if (std::find(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end(), v) == valuesUnsorted[pos].end()) {
166 valuesUnsorted[pos].push_back(v);
167 }
168 if ((int)groups[pos].size() <= v) {
169 groups[pos].resize(v + 1);
170 }
171 (groups[pos])[v].push_back(row);
172 }
173 ++row;
174 }
175 }
176 std::sort(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end());
177 return arrow::Status::OK();
178}
179
180std::pair<int, bool> ArrowTableSlicingCache::getCachePos(const Entry& bindingKey) const
181{
182 auto pos = getCachePosSortedFor(bindingKey);
183 if (pos != -1) {
184 return {pos, true};
185 }
186 pos = getCachePosUnsortedFor(bindingKey);
187 if (pos != -1) {
188 return {pos, false};
189 }
190 throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
191}
192
194{
195 auto locate = std::find_if(bindingsKeys.begin(), bindingsKeys.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
196 if (locate != bindingsKeys.end()) {
197 return std::distance(bindingsKeys.begin(), locate);
198 }
199 return -1;
200}
201
203{
204 auto locate_unsorted = std::find_if(bindingsKeysUnsorted.begin(), bindingsKeysUnsorted.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
205 if (locate_unsorted != bindingsKeysUnsorted.end()) {
206 return std::distance(bindingsKeysUnsorted.begin(), locate_unsorted);
207 }
208 return -1;
209}
211{
212 auto [p, s] = getCachePos(bindingKey);
213 if (!s) {
214 throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
215 }
216 if (!bindingsKeys[p].enabled) {
217 throw runtime_error_f("Disabled cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
218 }
219
220 return getCacheForPos(p);
221}
222
224{
225 auto [p, s] = getCachePos(bindingKey);
226 if (s) {
227 throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
228 }
230 throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
231 }
232
233 return getCacheUnsortedForPos(p);
234}
235
237{
238 return {
239 gsl::span{offsets[pos].data(), offsets[pos].size()}, //
240 gsl::span(sizes[pos].data(), sizes[pos].size()) //
241 };
242}
243
245{
246 return {
247 {reinterpret_cast<int const*>(valuesUnsorted[pos].data()), valuesUnsorted[pos].size()},
248 &(groups[pos]) //
249 };
250}
251
252void ArrowTableSlicingCache::validateOrder(Entry const& bindingKey, const std::shared_ptr<arrow::Table>& input)
253{
254 auto const& [target, key, enabled] = bindingKey;
255 auto column = input->GetColumnByName(key);
256 auto array0 = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(0)->data());
257 int32_t prev = 0;
258 int32_t cur = array0.Value(0);
259 int32_t lastNeg = cur < 0 ? cur : 0;
260 int32_t lastPos = cur < 0 ? -1 : cur;
261 for (auto i = 0; i < column->num_chunks(); ++i) {
262 auto array = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(i)->data());
263 for (auto e = 0; e < array.length(); ++e) {
264 prev = cur;
265 if (prev >= 0) {
266 lastPos = prev;
267 } else {
268 lastNeg = prev;
269 }
270 cur = array.Value(e);
271 if (cur >= 0) {
272 if (lastPos > cur) {
273 throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target.c_str(), key.c_str(), cur, lastPos);
274 }
275 if (lastPos == cur && prev < 0) {
276 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
277 }
278 } else {
279 if (lastNeg < cur) {
280 throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target.c_str(), key.c_str(), cur, lastNeg);
281 }
282 if (lastNeg == cur && prev >= 0) {
283 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
284 }
285 }
286 }
287 }
288}
289} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
StringRef key
GLsizei GLuint * groups
Definition glcorearb.h:3984
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
RuntimeErrorRef runtime_error_f(const char *,...)
Definition list.h:40
SliceInfoUnsortedPtr getCacheUnsortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
std::pair< int, bool > getCachePos(Entry const &bindingKey) const
SliceInfoPtr getCacheFor(Entry const &bindingKey) const
void setCaches(Cache &&bsks, Cache &&bsksUnsorted={})
ArrowTableSlicingCache(Cache &&bsks, Cache &&bsksUnsorted={})
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
int getCachePosUnsortedFor(Entry const &bindingKey) const
std::vector< std::vector< int > > valuesUnsorted
static void validateOrder(Entry const &bindingKey, std::shared_ptr< arrow::Table > const &input)
std::pair< int64_t, int64_t > getSliceFor(int value) const
std::span< int64_t const > getSliceFor(int value) const
std::vector< int > row