Project
Loading...
Searching...
No Matches
ArrowTableSlicingCache.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/table.h>
18
19namespace o2::framework
20{
21
22void updatePairList(std::vector<StringPair>& list, std::string const& binding, std::string const& key)
23{
24 if (std::find_if(list.begin(), list.end(), [&binding, &key](auto const& entry) { return (entry.first == binding) && (entry.second == key); }) == list.end()) {
25 list.emplace_back(binding, key);
26 }
27}
28
29std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
30{
31 int64_t offset = 0;
32 if (values.empty()) {
33 return {offset, 0};
34 }
35 int64_t p = static_cast<int64_t>(values.size()) - 1;
36 while (values[p] < 0) {
37 --p;
38 if (p < 0) {
39 return {offset, 0};
40 }
41 }
42
43 if (value > values[p]) {
44 return {offset, 0};
45 }
46
47 for (auto i = 0U; i < values.size(); ++i) {
48 if (values[i] == value) {
49 return {offset, counts[i]};
50 }
51 offset += counts[i];
52 }
53 return {offset, 0};
54}
55
56gsl::span<const int64_t> SliceInfoUnsortedPtr::getSliceFor(int value) const
57{
58 if (values.empty()) {
59 return {};
60 }
61 if (value > values[values.size() - 1]) {
62 return {};
63 }
64
65 return {(*groups)[value].data(), (*groups)[value].size()};
66}
67
68void ArrowTableSlicingCacheDef::setCaches(std::vector<StringPair>&& bsks)
69{
70 bindingsKeys = bsks;
71}
72
73void ArrowTableSlicingCacheDef::setCachesUnsorted(std::vector<StringPair>&& bsks)
74{
76}
77
78ArrowTableSlicingCache::ArrowTableSlicingCache(std::vector<StringPair>&& bsks, std::vector<StringPair>&& bsksUnsorted)
79 : bindingsKeys{bsks},
80 bindingsKeysUnsorted{bsksUnsorted}
81{
82 values.resize(bindingsKeys.size());
83 counts.resize(bindingsKeys.size());
84
86 groups.resize(bindingsKeysUnsorted.size());
87}
88
89void ArrowTableSlicingCache::setCaches(std::vector<StringPair>&& bsks, std::vector<StringPair>&& bsksUnsorted)
90{
91 bindingsKeys = bsks;
92 bindingsKeysUnsorted = bsksUnsorted;
93 values.clear();
94 values.resize(bindingsKeys.size());
95 counts.clear();
96 counts.resize(bindingsKeys.size());
97 valuesUnsorted.clear();
99 groups.clear();
100 groups.resize(bindingsKeysUnsorted.size());
101}
102
103arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
104{
105 if (table->num_rows() == 0) {
106 values[pos].reset();
107 counts[pos].reset();
108 return arrow::Status::OK();
109 }
111 arrow::Datum value_counts;
112 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
113 ARROW_ASSIGN_OR_RAISE(value_counts,
114 arrow::compute::CallFunction("value_counts", {table->GetColumnByName(bindingsKeys[pos].second)},
115 &options));
116 auto pair = static_cast<arrow::StructArray>(value_counts.array());
117 values[pos].reset();
118 counts[pos].reset();
119 values[pos] = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
120 counts[pos] = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
121 return arrow::Status::OK();
122}
123
124arrow::Status ArrowTableSlicingCache::updateCacheEntryUnsorted(int pos, const std::shared_ptr<arrow::Table>& table)
125{
126 valuesUnsorted[pos].clear();
127 groups[pos].clear();
128 if (table->num_rows() == 0) {
129 return arrow::Status::OK();
130 }
131 auto& [b, k] = bindingsKeysUnsorted[pos];
132 auto column = table->GetColumnByName(k);
133 auto row = 0;
134 for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
135 auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
136 for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
137 auto v = chunk.Value(iElement);
138 if (v >= 0) {
139 if (std::find(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end(), v) == valuesUnsorted[pos].end()) {
140 valuesUnsorted[pos].push_back(v);
141 }
142 if (groups[pos].size() <= v) {
143 groups[pos].resize(v + 1);
144 }
145 (groups[pos])[v].push_back(row);
146 }
147 ++row;
148 }
149 }
150 std::sort(valuesUnsorted[pos].begin(), valuesUnsorted[pos].end());
151 return arrow::Status::OK();
152}
153
154std::pair<int, bool> ArrowTableSlicingCache::getCachePos(const StringPair& bindingKey) const
155{
156 auto pos = getCachePosSortedFor(bindingKey);
157 if (pos != -1) {
158 return {pos, true};
159 }
160 pos = getCachePosUnsortedFor(bindingKey);
161 if (pos != -1) {
162 return {pos, false};
163 }
164 throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.first.c_str(), bindingKey.second.c_str());
165}
166
168{
169 auto locate = std::find_if(bindingsKeys.begin(), bindingsKeys.end(), [&](StringPair const& bk) { return (bindingKey.first == bk.first) && (bindingKey.second == bk.second); });
170 if (locate != bindingsKeys.end()) {
171 return std::distance(bindingsKeys.begin(), locate);
172 }
173 return -1;
174}
175
177{
178 auto locate_unsorted = std::find_if(bindingsKeysUnsorted.begin(), bindingsKeysUnsorted.end(), [&](StringPair const& bk) { return (bindingKey.first == bk.first) && (bindingKey.second == bk.second); });
179 if (locate_unsorted != bindingsKeysUnsorted.end()) {
180 return std::distance(bindingsKeysUnsorted.begin(), locate_unsorted);
181 }
182 return -1;
183}
185{
186 auto [p, s] = getCachePos(bindingKey);
187 if (!s) {
188 throw runtime_error_f("%s/%s is found in unsorted cache", bindingKey.first.c_str(), bindingKey.second.c_str());
189 }
190
191 return getCacheForPos(p);
192}
193
195{
196 auto [p, s] = getCachePos(bindingKey);
197 if (s) {
198 throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.first.c_str(), bindingKey.second.c_str());
199 }
200
201 return getCacheUnsortedForPos(p);
202}
203
205{
206 if (values[pos] == nullptr && counts[pos] == nullptr) {
207 return {
208 {},
209 {} //
210 };
211 }
212
213 return {
214 {reinterpret_cast<int const*>(values[pos]->values()->data()), static_cast<size_t>(values[pos]->length())},
215 {reinterpret_cast<int64_t const*>(counts[pos]->values()->data()), static_cast<size_t>(counts[pos]->length())} //
216 };
217}
218
220{
221 return {
222 {reinterpret_cast<int const*>(valuesUnsorted[pos].data()), valuesUnsorted[pos].size()},
223 &(groups[pos]) //
224 };
225}
226
227void ArrowTableSlicingCache::validateOrder(StringPair const& bindingKey, const std::shared_ptr<arrow::Table>& input)
228{
229 auto const& [target, key] = bindingKey;
230 auto column = input->GetColumnByName(key);
231 auto array0 = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(0)->data());
232 int32_t prev = 0;
233 int32_t cur = array0.Value(0);
234 int32_t lastNeg = cur < 0 ? cur : 0;
235 int32_t lastPos = cur < 0 ? -1 : cur;
236 for (auto i = 0; i < column->num_chunks(); ++i) {
237 auto array = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(i)->data());
238 for (auto e = 0; e < array.length(); ++e) {
239 prev = cur;
240 if (prev >= 0) {
241 lastPos = prev;
242 } else {
243 lastNeg = prev;
244 }
245 cur = array.Value(e);
246 if (cur >= 0) {
247 if (lastPos > cur) {
248 throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target.c_str(), key.c_str(), cur, lastPos);
249 }
250 if (lastPos == cur && prev < 0) {
251 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
252 }
253 } else {
254 if (lastNeg < cur) {
255 throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target.c_str(), key.c_str(), cur, lastNeg);
256 }
257 if (lastNeg == cur && prev >= 0) {
258 throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target.c_str(), key.c_str(), cur, prev);
259 }
260 }
261 }
262 }
263}
264} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
StringRef key
GLsizei GLuint * groups
Definition glcorearb.h:3984
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void updatePairList(std::vector< StringPair > &list, std::string const &binding, std::string const &key)
std::pair< std::string, std::string > StringPair
RuntimeErrorRef runtime_error_f(const char *,...)
Definition list.h:40
void setCaches(std::vector< StringPair > &&bsks)
void setCachesUnsorted(std::vector< StringPair > &&bsks)
std::pair< int, bool > getCachePos(StringPair const &bindingKey) const
arrow::Status updateCacheEntryUnsorted(int pos, std::shared_ptr< arrow::Table > const &table)
void setCaches(std::vector< StringPair > &&bsks, std::vector< StringPair > &&bsksUnsorted={})
arrow::Status updateCacheEntry(int pos, std::shared_ptr< arrow::Table > const &table)
int getCachePosSortedFor(StringPair const &bindingKey) const
int getCachePosUnsortedFor(StringPair const &bindingKey) const
SliceInfoPtr getCacheFor(StringPair const &bindingKey) const
ArrowTableSlicingCache(std::vector< StringPair > &&bsks, std::vector< StringPair > &&bsksUnsorted={})
SliceInfoUnsortedPtr getCacheUnsortedFor(StringPair const &bindingKey) const
SliceInfoUnsortedPtr getCacheUnsortedForPos(int pos) const
static void validateOrder(StringPair const &bindingKey, std::shared_ptr< arrow::Table > const &input)
std::vector< std::vector< int > > valuesUnsorted
std::vector< std::shared_ptr< arrow::NumericArray< arrow::Int64Type > > > counts
gsl::span< int64_t const > counts
std::pair< int64_t, int64_t > getSliceFor(int value) const
gsl::span< int64_t const > getSliceFor(int value) const
std::vector< int > row