Project
Loading...
Searching...
No Matches
ArrowTableSlicingCache.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/table.h>
18
19namespace o2::framework
20{
21
22void updatePairList(Cache& list, std::string const& binding, std::string const& key, bool enabled = true)
23{
24 auto locate = std::find_if(list.begin(), list.end(), [&binding, &key](auto const& entry) { return (entry.binding == binding) && (entry.key == key); });
25 if (locate == list.end()) {
26 list.emplace_back(binding, key, enabled);
27 } else if (!locate->enabled && enabled) {
28 locate->enabled = true;
29 }
30}
31
32std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
33{
34 int64_t offset = 0;
35 if (values.empty()) {
36 return {offset, 0};
37 }
38 int64_t p = static_cast<int64_t>(values.size()) - 1;
39 while (values[p] < 0) {
40 --p;
41 if (p < 0) {
42 return {offset, 0};
43 }
44 }
45
46 if (value > values[p]) {
47 return {offset, 0};
48 }
49
50 for (auto i = 0U; i < values.size(); ++i) {
51 if (values[i] == value) {
52 return {offset, counts[i]};
53 }
54 offset += counts[i];
55 }
56 return {offset, 0};
57}
58
59gsl::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(int value) const
60{
61 if (values.empty()) {
62 return {};
63 }
64 if (value > values[values.size() - 1]) {
65 return {};
66 }
67
68 return {(*groups)[value].data(), (*groups)[value].size()};
69}
70
75
80
82 : bindingsKeys{bsks},
83 bindingsKeysUnsorted{bsksUnsorted}
84{
85 values.resize(bindingsKeys.size());
86 counts.resize(bindingsKeys.size());
87
89 groups.resize(bindingsKeysUnsorted.size());
90}
91
92void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
93{
94 bindingsKeys = bsks;
95 bindingsKeysUnsorted = bsksUnsorted;
96 values.clear();
97 values.resize(bindingsKeys.size());
98 counts.clear();
99 counts.resize(bindingsKeys.size());
100 valuesUnsorted.clear();
102 groups.clear();
103 groups.resize(bindingsKeysUnsorted.size());
104}
105
106arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
107{
108 if (table->num_rows() == 0) {
109 values[pos].reset();
110 counts[pos].reset();
111 return arrow::Status::OK();
112 }
113 auto& [b, k, e] = bindingsKeys[pos];
114 if (!e) {
115 throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str());
116 }
118 arrow::Datum value_counts;
119 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
120 ARROW_ASSIGN_OR_RAISE(value_counts,
121 arrow::compute::CallFunction("value_counts", {table->GetColumnByName(bindingsKeys[pos].key)},
122 &options));
123 auto pair = static_cast<arrow::StructArray>(value_counts.array());
124 values[pos].reset();
125 counts[pos].reset();
126 values[pos] = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
127 counts[pos] = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
128 return arrow::Status::OK();
129}
130
131arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const std::shared_ptr<arrow::Table>& table)
132{
133 valuesUnsorted[pos].clear();
134 groups[pos].clear();
135 if (table->num_rows() == 0) {
136 return arrow::Status::OK();
137 }
138 auto& [b, k, e] = bindingsKeysUnsorted[pos];
139 if (!e) {
140 throw runtime_error_f("Disabled unsorted cache %s/%s update requested", b.c_str(), k.c_str());
141 }
142 auto column = table->GetColumnByName(k);
143 auto row = 0;
144 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
145 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
146 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
147 auto v = chunk.Value(iElement);
148 if (v >= 0) {
149 if (std::find(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end(), v) == valuesUnsorted[pos].end()) {
150 valuesUnsorted[pos].push_back(v);
151 }
152 if ((int)groups[pos].size() <= v) {
153 groups[pos].resize(v + 1);
154 }
155 (groups[pos])[v].push_back(row);
156 }
157 ++row;
158 }
159 }
160 std::sort(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end());
161 return arrow::Status::OK();
162}
163
164std::pair<int, bool> ArrowTableSlicingCache::getCachePos(const Entry& bindingKey) const
165{
166 auto pos = getCachePosSortedFor(bindingKey);
167 if (pos != -1) {
168 return {pos, true};
169 }
170 pos = getCachePosUnsortedFor(bindingKey);
171 if (pos != -1) {
172 return {pos, false};
173 }
174 throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
175}
176
178{
179 auto locate = std::find_if(bindingsKeys.begin(), bindingsKeys.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
180 if (locate != bindingsKeys.end()) {
181 return std::distance(bindingsKeys.begin(), locate);
182 }
183 return -1;
184}
185
187{
188 auto locate_unsorted = std::find_if(bindingsKeysUnsorted.begin(), bindingsKeysUnsorted.end(), [&](Entry const& bk) { return (bindingKey.binding == bk.binding) && (bindingKey.key == bk.key); });
189 if (locate_unsorted != bindingsKeysUnsorted.end()) {
190 return std::distance(bindingsKeysUnsorted.begin(), locate_unsorted);
191 }
192 return -1;
193}
195{
196 auto [p, s] = getCachePos(bindingKey);
197 if (!s) {
198 throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
199 }
200 if (!bindingsKeys[p].enabled) {
201 throw runtime_error_f("Disabled cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
202 }
203
204 return getCacheForPos(p);
205}
206
208{
209 auto [p, s] = getCachePos(bindingKey);
210 if (s) {
211 throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str());
212 }
214 throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str());
215 }
216
217 return getCacheUnsortedForPos(p);
218}
219
221{
222 if (values[pos] == nullptr && counts[pos] == nullptr) {
223 return {
224 {},
225 {} //
226 };
227 }
228
229 return {
230 {reinterpret_cast<int const*>(values[pos]->values()->data()), static_cast<size_t>(values[pos]->length())},
231 {reinterpret_cast<int64_t const*>(counts[pos]->values()->data()), static_cast<size_t>(counts[pos]->length())} //
232 };
233}
234
236{
237 return {
238 {reinterpret_cast<int const*>(valuesUnsorted[pos].data()), valuesUnsorted[pos].size()},
239 &(groups[pos]) //
240 };
241}
242
243void ArrowTableSlicingCache::validateOrder(Entry const& bindingKey, const std::shared_ptr<arrow::Table>& input)
244{
245 auto const& [target, key, enabled] = bindingKey;
246 auto column = input->GetColumnByName(key);
247 auto array0 = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(0)->data());
248 int32_t prev = 0;
249 int32_t cur = array0.Value(0);
250 int32_t lastNeg = cur < 0 ? cur : 0;
251 int32_t lastPos = cur < 0 ? -1 : cur;
252 for (auto i = 0; i < column->num_chunks(); ++i) {
253 auto array = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(i)->data());
254 for (auto e = 0; e < array.length(); ++e) {
255 prev = cur;
256 if (prev >= 0) {
257 lastPos = prev;
258 } else {
259 lastNeg = prev;
260 }
261 cur = array.Value(e);
262 if (cur >= 0) {
263 if (lastPos > cur) {
264 throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target.c_str(), key.c_str(), cur, lastPos);
265 }
266 if (lastPos == cur && prev < 0) {
267 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
268 }
269 } else {
270 if (lastNeg < cur) {
271 throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target.c_str(), key.c_str(), cur, lastNeg);
272 }
273 if (lastNeg == cur && prev >= 0) {
274 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
275 }
276 }
277 }
278 }
279}
280} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
StringRef key
GLsizei GLuint * groups
Definition glcorearb.h:3984
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
RuntimeErrorRef runtime_error_f(const char *,...)
Definition list.h:40
SliceInfoUnsortedPtr getCacheUnsortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(Entry const &bindingKey) const
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
std::pair< int, bool > getCachePos(Entry const &bindingKey) const
SliceInfoPtr getCacheFor(Entry const &bindingKey) const
void setCaches(Cache &&bsks, Cache &&bsksUnsorted={})
ArrowTableSlicingCache(Cache &&bsks, Cache &&bsksUnsorted={})
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
int getCachePosUnsortedFor(Entry const &bindingKey) const
std::vector< std::vector< int > > valuesUnsorted
static void validateOrder(Entry const &bindingKey, std::shared_ptr< arrow::Table > const &input)
std::vector< std::shared_ptr< arrow::NumericArray< arrow::Int64Type > > > counts
gsl::span< int64_t const > counts
std::pair< int64_t, int64_t > getSliceFor(int value) const
gsl::span< int64_t const > getSliceFor(int value) const
std::vector< int > row