Project
Loading...
Searching...
No Matches
RCombinedDS.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef ROOT_RCOMBINEDDS
12#define ROOT_RCOMBINEDDS
13
14#include "ROOT/RDataFrame.hxx"
15#include "ROOT/RDataSource.hxx"
16
17#include <memory>
18#include <string>
19#include <vector>
20
21namespace ROOT
22{
23
24namespace RDF
25{
26
34{
35 public:
36 virtual ~RCombinedDSIndex() = default;
42 virtual std::vector<std::pair<ULong64_t, ULong64_t>> BuildIndex(std::unique_ptr<RDataFrame>& left,
43 std::unique_ptr<RDataFrame>& right) = 0;
49 virtual std::pair<ULong64_t, ULong64_t> GetAssociatedEntries(ULong64_t entry) = 0;
50};
51
55{
56 public:
57 std::pair<ULong64_t, ULong64_t> GetAssociatedEntries(ULong64_t entry) final
58 {
59 return std::pair<ULong64_t, ULong64_t>(entry, entry);
60 }
61 std::vector<std::pair<ULong64_t, ULong64_t>> BuildIndex(std::unique_ptr<RDataFrame>& left,
62 std::unique_ptr<RDataFrame>& right) final;
63};
64
68{
69 public:
70 std::pair<ULong64_t, ULong64_t> GetAssociatedEntries(ULong64_t entry) final
71 {
72 return std::make_pair<ULong64_t, ULong64_t>(entry / fRightCount, entry % fRightCount);
73 }
74 std::vector<std::pair<ULong64_t, ULong64_t>> BuildIndex(std::unique_ptr<RDataFrame>& left,
75 std::unique_ptr<RDataFrame>& right) final;
76
77 private:
78 ULong64_t fLeftCount;
79 ULong64_t fRightCount;
80};
81
86template <typename INDEX_TYPE = int>
88{
89 public:
90 RCombinedDSColumnJoinIndex(std::string const& indexColumnName)
91 : fIndexColumnName{indexColumnName}
92 {
93 }
94
95 std::pair<ULong64_t, ULong64_t> GetAssociatedEntries(ULong64_t entry) final
96 {
97 auto left = fAssociations[entry];
98 return std::pair<ULong64_t, ULong64_t>(left, entry);
99 }
100
101 std::vector<std::pair<ULong64_t, ULong64_t>>
102 BuildIndex(std::unique_ptr<RDataFrame>& left,
103 std::unique_ptr<RDataFrame>& right) final
104 {
105 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
106 auto nEntries = *right->Count();
107 fAssociations.reserve(nEntries);
108 // Fill the index with the associations
109 auto filler = [&assoc = fAssociations](INDEX_TYPE ri) { assoc.push_back(ri); };
110 right->Foreach(filler, std::vector<std::string>{fIndexColumnName});
111
112 // Create the ranges by processing 64 entries per range
113 auto deltaRange = 64;
114 ranges.reserve(nEntries / deltaRange + 1);
115 ULong64_t i = 0;
116 while (deltaRange * (i + 1) < nEntries) {
117 ranges.emplace_back(std::pair<ULong64_t, ULong64_t>(deltaRange * i, deltaRange * (i + 1)));
118 }
119 ranges.emplace_back(std::pair<ULong64_t, ULong64_t>(deltaRange * i, nEntries)); // Last entry
120 return ranges;
121 }
122
123 private:
124 std::string fIndexColumnName;
125 std::vector<INDEX_TYPE> fAssociations;
126};
127
129 Full,
130 Upper,
132 Diagonal,
133 Anti
134};
135
137 static char const* combinationRuleAsString(BlockCombinationRule ruleType);
138};
139
149template <typename INDEX_TYPE = int>
151{
152
153 using Association = std::pair<ULong64_t, ULong64_t>;
154
155 public:
156 RCombinedDSBlockJoinIndex(std::string const& leftCategoryColumn,
157 bool self = true,
159 std::string const& rightCategoryColumn = "")
160 : fLeftCategoryColumn{leftCategoryColumn},
161 fRightCategoryColumn{rightCategoryColumn.empty() ? leftCategoryColumn : rightCategoryColumn},
162 fSelf{self},
163 fCombinationType{combinationType}
164 {
165 }
166
167 std::pair<ULong64_t, ULong64_t> GetAssociatedEntries(ULong64_t entry) final
168 {
169 return fAssociations[entry];
170 }
171
172 std::vector<std::pair<ULong64_t, ULong64_t>>
173 BuildIndex(std::unique_ptr<RDataFrame>& left,
174 std::unique_ptr<RDataFrame>& right) final
175 {
176 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
177 std::vector<INDEX_TYPE> leftCategories;
178 std::vector<INDEX_TYPE> rightCategories;
179 std::vector<Association> leftPairs;
180 std::vector<Association> rightPairs;
181
182 computePairsAndCategories(left, leftCategories, leftPairs, fLeftCategoryColumn);
184 if (fSelf) {
185 rightCategories = leftCategories;
186 rightPairs = leftPairs;
187 } else {
188 computePairsAndCategories(right, rightCategories, rightPairs, fRightCategoryColumn);
189 }
190
191 auto same = [](std::pair<ULong64_t, ULong64_t> const& a, std::pair<ULong64_t, ULong64_t> const& b) {
192 return a.first < b.first;
193 };
194
198 int startSize = fAssociations.size();
199 for (auto categoryValue : leftCategories) {
200 std::pair<ULong64_t, ULong64_t> p{categoryValue, 0};
201 auto outerRange = std::equal_range(leftPairs.begin(), leftPairs.end(), p, same);
202 decltype(outerRange) innerRange;
203 if (fSelf) {
204 innerRange = outerRange;
205 } else {
206 innerRange = std::equal_range(rightPairs.begin(), rightPairs.end(), p, same);
207 }
208 int offset = 0;
209 switch (fCombinationType) {
211 for (auto out = outerRange.first; out != outerRange.second; ++out) {
212 for (auto in = innerRange.first; in != innerRange.second; ++in) {
213 fAssociations.emplace_back(Association{out->second, in->second});
214 }
215 }
216 break;
218 offset = 0;
219 for (auto out = outerRange.first; out != outerRange.second; ++out) {
220 if (innerRange.first == innerRange.second) {
221 break;
222 }
223 for (auto in = innerRange.first + offset; in != innerRange.second; ++in) {
224 fAssociations.emplace_back(Association{out->second, in->second});
225 }
226 offset++;
227 }
228 break;
230 offset = 1;
231 for (auto out = outerRange.first; out != outerRange.second; ++out) {
232 if (innerRange.first == innerRange.second || innerRange.first + 1 == innerRange.second) {
233 break;
234 }
235 for (auto in = innerRange.first + offset; in != innerRange.second; ++in) {
236 fAssociations.emplace_back(Association{out->second, in->second});
237 }
238 offset++;
239 }
240 break;
242 for (auto out = outerRange.first; out != outerRange.second; ++out) {
243 for (auto in = innerRange.first; in != innerRange.second; ++in) {
244 if (std::distance(innerRange.first, in) == std::distance(outerRange.first, out)) {
245 continue;
246 }
247 fAssociations.emplace_back(Association{out->second, in->second});
248 }
249 offset++;
250 }
251 break;
253 auto sizeRow = std::distance(outerRange.first, outerRange.second);
254 auto sizeCol = std::distance(innerRange.first, innerRange.second);
255 for (size_t i = 0, e = std::min(sizeRow, sizeCol); i < e; ++i) {
256 fAssociations.emplace_back(Association{(outerRange.first + i)->second, (innerRange.first + i)->second});
257 }
258 break;
259 }
260 auto rangeFirst = startSize;
261 auto rangeSecond = fAssociations.size();
262 startSize = fAssociations.size();
263 ranges.emplace_back(std::make_pair<ULong64_t, ULong64_t>(rangeFirst, rangeSecond));
264 }
265 return ranges;
266 }
267
268 private:
269 std::string fLeftCategoryColumn;
270 std::string fRightCategoryColumn;
271 bool fSelf;
272 BlockCombinationRule fCombinationType;
273 std::vector<Association> fAssociations;
274 void computePairsAndCategories(std::unique_ptr<RDataFrame>& df,
275 std::vector<INDEX_TYPE>& categories,
276 std::vector<Association>& pairs,
277 std::string const& column)
278 {
279 categories = *df->template Take<INDEX_TYPE>(column);
280 pairs.reserve(categories.size());
281 // Fill the pairs according tho the actual category
282 for (size_t i = 0; i < categories.size(); ++i) {
283 pairs.emplace_back(categories[i], i);
284 }
285 // Do a stable sort so that same categories entries are
286 // grouped together.
287 std::stable_sort(pairs.begin(), pairs.end());
288 // Keep only the categories.
289 std::stable_sort(categories.begin(), categories.end());
290 auto last = std::unique(categories.begin(), categories.end());
291 categories.erase(last, categories.end());
292 }
293};
294
301{
302 private:
305 RDataSource* fLeft;
306 RDataSource* fRight;
307 std::string fLeftPrefix;
308 std::string fRightPrefix;
309 std::unique_ptr<RDataFrame> fLeftDF;
310 std::unique_ptr<RDataFrame> fRightDF;
311 ULong64_t fLeftCount;
312 ULong64_t fRightCount;
313 size_t fNSlots = 0U;
314 std::vector<std::string> fColumnNames;
315 std::vector<std::pair<ULong64_t, ULong64_t>> fEntryRanges;
316 std::unique_ptr<RCombinedDSIndex> fIndex;
317
318 protected:
319 std::vector<void*> GetColumnReadersImpl(std::string_view colName, const std::type_info& info) override;
320
321 public:
322 RCombinedDS(std::unique_ptr<RDataSource> left,
323 std::unique_ptr<RDataSource> right,
324 std::unique_ptr<RCombinedDSIndex> index = std::make_unique<RCombinedDSFriendIndex>(),
325 std::string leftPrefix = std::string{"left_"},
326 std::string rightPrefix = std::string{"right_"});
327 ~RCombinedDS() override;
328
329 template <typename T>
330 std::vector<T**> GetColumnReaders(std::string_view colName)
331 {
332 if (colName.compare(0, fLeftPrefix.size(), fLeftPrefix)) {
333 colName.remove_prefix(fLeftPrefix.size());
334 return fLeft->GetColumnReaders<T>(colName);
335 }
336 if (colName.compare(0, fRightPrefix.size(), fRightPrefix)) {
337 colName.remove_prefix(fRightPrefix.size());
338 return fRight->GetColumnReaders<T>(colName);
339 }
340 std::string dummy("Column not found: ");
341 dummy += colName.data();
342 throw std::runtime_error(dummy);
343 }
344 [[nodiscard]] const std::vector<std::string>& GetColumnNames() const override;
345 std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() override;
346 [[nodiscard]] std::string GetTypeName(std::string_view colName) const override;
347 [[nodiscard]] bool HasColumn(std::string_view colName) const override;
348 bool SetEntry(unsigned int slot, ULong64_t entry) override;
349 void InitSlot(unsigned int slot, ULong64_t firstEntry) override;
350 void SetNSlots(unsigned int nSlots) override;
351 void Initialize() override;
352};
353
357RDataFrame MakeCombinedDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource>, std::unique_ptr<RCombinedDSIndex> index, std::string leftPrefix = "left_", std::string rightPrefix = "right_");
358RDataFrame MakeCrossProductDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource>, std::string leftPrefix = "left_", std::string rightPrefix = "right_");
359RDataFrame MakeColumnIndexedDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource>, std::string indexColName, std::string leftPrefix = "left_", std::string rightPrefix = "right_");
360RDataFrame MakeFriendDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right, std::string leftPrefix = "left_", std::string rightPrefix = "right_");
361RDataFrame MakeBlockAntiDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right, std::string indexColumnName, std::string leftPrefix = "left_", std::string rightPrefix = "right_");
362
363} // namespace RDF
364
365} // namespace ROOT
366
367#endif
int32_t i
RCombinedDSBlockJoinIndex(std::string const &leftCategoryColumn, bool self=true, BlockCombinationRule combinationType=BlockCombinationRule::Anti, std::string const &rightCategoryColumn="")
std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right) final
std::pair< ULong64_t, ULong64_t > GetAssociatedEntries(ULong64_t entry) final
RCombinedDSColumnJoinIndex(std::string const &indexColumnName)
Definition RCombinedDS.h:90
std::pair< ULong64_t, ULong64_t > GetAssociatedEntries(ULong64_t entry) final
Definition RCombinedDS.h:95
std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right) final
std::pair< ULong64_t, ULong64_t > GetAssociatedEntries(ULong64_t entry) final
Definition RCombinedDS.h:70
std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right) final
std::pair< ULong64_t, ULong64_t > GetAssociatedEntries(ULong64_t entry) final
Definition RCombinedDS.h:57
std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right) final
virtual std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right)=0
virtual ~RCombinedDSIndex()=default
virtual std::pair< ULong64_t, ULong64_t > GetAssociatedEntries(ULong64_t entry)=0
RDataSource which does the cartesian product of entries in two other datasources.
bool HasColumn(std::string_view colName) const override
~RCombinedDS() override
Destructor.
bool SetEntry(unsigned int slot, ULong64_t entry) override
std::string GetTypeName(std::string_view colName) const override
std::vector< T ** > GetColumnReaders(std::string_view colName)
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() override
void InitSlot(unsigned int slot, ULong64_t firstEntry) override
void SetNSlots(unsigned int nSlots) override
std::vector< void * > GetColumnReadersImpl(std::string_view colName, const std::type_info &info) override
This should never be called, since we did a template overload for GetColumnReaders()
const std::vector< std::string > & GetColumnNames() const override
void Initialize() override
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLintptr offset
Definition glcorearb.h:660
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
RDataFrame MakeColumnIndexedDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource >, std::string indexColName, std::string leftPrefix="left_", std::string rightPrefix="right_")
RDataFrame MakeBlockAntiDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource > right, std::string indexColumnName, std::string leftPrefix="left_", std::string rightPrefix="right_")
RDataFrame MakeCombinedDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource >, std::unique_ptr< RCombinedDSIndex > index, std::string leftPrefix="left_", std::string rightPrefix="right_")
Factory method to create a Apache Arrow RDataFrame.
RDataFrame MakeFriendDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource > right, std::string leftPrefix="left_", std::string rightPrefix="right_")
RDataFrame MakeCrossProductDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource >, std::string leftPrefix="left_", std::string rightPrefix="right_")
void empty(int)
static char const * combinationRuleAsString(BlockCombinationRule ruleType)