Project
Loading...
Searching...
No Matches
RCombinedDS.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// Author: Giulio Eulisse CERN 2/2018
13
14/*************************************************************************
15 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
16 * All rights reserved. *
17 * *
18 * For the licensing terms see $ROOTSYS/LICENSE. *
19 * For the list of contributors see $ROOTSYS/README/CREDITS. *
20 *************************************************************************/
21
22// clang-format off
32// clang-format on
33
34#define protected public
37
38#if __has_include(<ROOT/RDF/Utils.hxx>)
39#include <ROOT/RDF/Utils.hxx>
40#else
41#include <ROOT/RDFUtils.hxx>
42#endif
43
44#include <ROOT/TSeq.hxx>
45#include <ROOT/RDataFrame.hxx>
46
47#include <algorithm>
48#include <sstream>
49#include <string>
50
51using namespace ROOT::RDF;
52
53namespace ROOT
54{
55namespace RDF
56{
57
59{
60 switch (type) {
62 return "full";
64 return "antidiagonal";
66 return "diagonal";
68 return "uppertriangular";
70 return "stricly-uppertriangular";
71 }
72 throw std::runtime_error("Unknown BlockCombinationRule");
73}
74
75std::vector<std::pair<ULong64_t, ULong64_t>>
76 RCombinedDSCrossJoinIndex::BuildIndex(std::unique_ptr<RDataFrame>& left,
77 std::unique_ptr<RDataFrame>& right)
78{
79 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
80 fLeftCount = *left->Count();
81 fRightCount = *right->Count();
82 ranges.reserve(fLeftCount);
83 for (ULong64_t i = 0; i < fLeftCount; ++i) {
84 ranges.emplace_back(std::make_pair<ULong64_t, ULong64_t>(fRightCount * i, fRightCount * (i + 1)));
85 }
86 return ranges;
87}
88
89std::vector<std::pair<ULong64_t, ULong64_t>>
90 RCombinedDSFriendIndex::BuildIndex(std::unique_ptr<RDataFrame>& left,
91 std::unique_ptr<RDataFrame>& right)
92{
93 auto leftCount = *left->Count();
94 auto rightCount = *right->Count();
95 if (leftCount != rightCount) {
96 throw std::runtime_error("Union can be performed only with two datasources which have the same amount of entries");
97 }
98 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
99 // FIXME: should we really use the min between two number of slots?
100 auto nSlots = std::min(left->GetLoopManager()->GetNSlots(), right->GetLoopManager()->GetNSlots());
101 assert(nSlots > 0);
102 auto deltaSize = rightCount / nSlots;
103 ULong64_t i = 0;
104 for (; i < (nSlots - 1); ++i) {
105 ranges.emplace_back(std::pair<ULong64_t, ULong64_t>(deltaSize * i, deltaSize * (i + 1)));
106 }
107 ranges.emplace_back(std::pair<ULong64_t, ULong64_t>(i * deltaSize, rightCount));
108 return ranges;
109}
110
118RCombinedDS::RCombinedDS(std::unique_ptr<RDataSource> inLeft, std::unique_ptr<RDataSource> inRight,
119 std::unique_ptr<RCombinedDSIndex> inIndex,
120 std::string inLeftPrefix, std::string inRightPrefix)
121 : // FIXME: we cache the bare pointers, under the assumption that
122 // the dataframes fLeftDF, fRightDF have longer lifetime as
123 // they actually own them.
124 fLeft{inLeft.get()},
125 fRight{inRight.get()},
126 fLeftDF{std::make_unique<RDataFrame>(std::move(inLeft))},
127 fRightDF{std::make_unique<RDataFrame>(std::move(inRight))},
128 fLeftPrefix{inLeftPrefix},
129 fRightPrefix{inRightPrefix},
130 fIndex{std::move(inIndex)}
131{
132 fColumnNames.reserve(fLeft->GetColumnNames().size() + fRight->GetColumnNames().size());
133 for (auto& c : fLeft->GetColumnNames()) {
134 fColumnNames.push_back(fLeftPrefix + c);
135 }
136 for (auto& c : fRight->GetColumnNames()) {
137 fColumnNames.push_back(fRightPrefix + c);
138 }
139}
140
143RCombinedDS::~RCombinedDS() = default;
144
145const std::vector<std::string>& RCombinedDS::GetColumnNames() const
146{
147 return fColumnNames;
148}
149
150std::vector<std::pair<ULong64_t, ULong64_t>> RCombinedDS::GetEntryRanges()
151{
152 auto entryRanges(std::move(fEntryRanges)); // empty fEntryRanges
153 return entryRanges;
154}
155
156std::string RCombinedDS::GetTypeName(std::string_view colName) const
157{
158 if (colName.compare(0, fLeftPrefix.size(), fLeftPrefix) == 0) {
159 colName.remove_prefix(fLeftPrefix.size());
160 return fLeft->GetTypeName(colName);
161 }
162 if (colName.compare(0, fRightPrefix.size(), fRightPrefix) == 0) {
163 colName.remove_prefix(fRightPrefix.size());
164 return fRight->GetTypeName(colName);
165 }
166 std::string dummy("Column not found: ");
167 dummy += colName.data();
168 throw std::runtime_error(dummy);
169}
170
171bool RCombinedDS::HasColumn(std::string_view colName) const
172{
173 if (colName.compare(0, fLeftPrefix.size(), fLeftPrefix) == 0) {
174 colName.remove_prefix(fLeftPrefix.size());
175 return fLeft->HasColumn(colName);
176 }
177 if (colName.compare(0, fRightPrefix.size(), fRightPrefix) == 0) {
178 colName.remove_prefix(fRightPrefix.size());
179 return fRight->HasColumn(colName);
180 }
181 return false;
182}
183
184bool RCombinedDS::SetEntry(unsigned int slot, ULong64_t entry)
185{
186 std::pair<ULong64_t, ULong64_t> association = fIndex->GetAssociatedEntries(entry);
187 fLeft->SetEntry(slot, association.first);
188 fRight->SetEntry(slot, association.second);
189 return true;
190}
191
192void RCombinedDS::InitSlot(unsigned int slot, ULong64_t entry)
193{
194 std::pair<ULong64_t, ULong64_t> association = fIndex->GetAssociatedEntries(entry);
195 fLeft->InitSlot(slot, association.first);
196 fRight->InitSlot(slot, association.second);
197}
198
199void RCombinedDS::SetNSlots(unsigned int nSlots)
200{
201 assert(0U == fNSlots && "Setting the number of slots even if the number of slots is different from zero.");
204 fLeft->SetNSlots(nSlots);
205 fRight->SetNSlots(nSlots);
206}
207
209std::vector<void*> RCombinedDS::GetColumnReadersImpl(std::string_view colName, const std::type_info& info)
210{
211 if (colName.compare(0, fLeftPrefix.size(), fLeftPrefix) == 0) {
212 colName.remove_prefix(fLeftPrefix.size());
213 return fLeft->GetColumnReadersImpl(colName, info);
214 }
215 if (colName.compare(0, fRightPrefix.size(), fRightPrefix) == 0) {
216 colName.remove_prefix(fRightPrefix.size());
217 return fRight->GetColumnReadersImpl(colName, info);
218 }
219 assert(false);
221}
222
224{
225 fEntryRanges = fIndex->BuildIndex(fLeftDF, fRightDF);
226
227 fLeft->Initialize();
228 fRight->Initialize();
229}
230
235RDataFrame MakeCombinedDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right,
236 std::unique_ptr<RCombinedDSIndex> index,
237 std::string leftPrefix, std::string rightPrefix)
238{
239 ROOT::RDataFrame tdf(std::make_unique<RCombinedDS>(std::move(left), std::move(right), std::move(index), leftPrefix, rightPrefix));
240 return tdf;
241}
242
243RDataFrame MakeCrossProductDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right,
244 std::string leftPrefix, std::string rightPrefix)
245{
246 ROOT::RDataFrame tdf(std::make_unique<RCombinedDS>(std::move(left), std::move(right), std::move(std::make_unique<RCombinedDSCrossJoinIndex>()), leftPrefix, rightPrefix));
247 return tdf;
248}
249
250RDataFrame MakeColumnIndexedDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right,
251 std::string indexColumnName,
252 std::string leftPrefix, std::string rightPrefix)
253{
254 ROOT::RDataFrame tdf(std::make_unique<RCombinedDS>(std::move(left), std::move(right), std::move(std::make_unique<RCombinedDSColumnJoinIndex<int>>(indexColumnName)), leftPrefix, rightPrefix));
255 return tdf;
256}
257
258RDataFrame MakeBlockAntiDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right,
259 std::string indexColumnName,
260 std::string leftPrefix, std::string rightPrefix)
261{
262 ROOT::RDataFrame tdf(std::make_unique<RCombinedDS>(std::move(left), std::move(right), std::move(std::make_unique<RCombinedDSBlockJoinIndex<int>>(indexColumnName)), leftPrefix, rightPrefix));
263 return tdf;
264}
265
266RDataFrame MakeFriendDataFrame(std::unique_ptr<RDataSource> left, std::unique_ptr<RDataSource> right,
267 std::string leftPrefix, std::string rightPrefix)
268{
269 ROOT::RDataFrame tdf(std::make_unique<RCombinedDS>(std::move(left), std::move(right), std::move(std::make_unique<RCombinedDSFriendIndex>()), leftPrefix, rightPrefix));
270 return tdf;
271}
272
273} // namespace RDF
274} // namespace ROOT
#define O2_BUILTIN_UNREACHABLE
int32_t i
uint32_t c
Definition RawData.h:2
std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right) final
std::vector< std::pair< ULong64_t, ULong64_t > > BuildIndex(std::unique_ptr< RDataFrame > &left, std::unique_ptr< RDataFrame > &right) final
bool HasColumn(std::string_view colName) const override
~RCombinedDS() override
Destructor.
RCombinedDS(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource > right, std::unique_ptr< RCombinedDSIndex > index=std::make_unique< RCombinedDSFriendIndex >(), std::string leftPrefix=std::string{"left_"}, std::string rightPrefix=std::string{"right_"})
bool SetEntry(unsigned int slot, ULong64_t entry) override
std::string GetTypeName(std::string_view colName) const override
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() override
void InitSlot(unsigned int slot, ULong64_t firstEntry) override
void SetNSlots(unsigned int nSlots) override
std::vector< void * > GetColumnReadersImpl(std::string_view colName, const std::type_info &info) override
This should never be called, since we did a template overload for GetColumnReaders()
const std::vector< std::string > & GetColumnNames() const override
void Initialize() override
GLuint entry
Definition glcorearb.h:5735
GLuint index
Definition glcorearb.h:781
GLdouble GLdouble right
Definition glcorearb.h:4077
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
RDataFrame MakeColumnIndexedDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource >, std::string indexColName, std::string leftPrefix="left_", std::string rightPrefix="right_")
RDataFrame MakeBlockAntiDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource > right, std::string indexColumnName, std::string leftPrefix="left_", std::string rightPrefix="right_")
RDataFrame MakeCombinedDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource >, std::unique_ptr< RCombinedDSIndex > index, std::string leftPrefix="left_", std::string rightPrefix="right_")
Factory method to create a Apache Arrow RDataFrame.
RDataFrame MakeFriendDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource > right, std::string leftPrefix="left_", std::string rightPrefix="right_")
RDataFrame MakeCrossProductDataFrame(std::unique_ptr< RDataSource > left, std::unique_ptr< RDataSource >, std::string leftPrefix="left_", std::string rightPrefix="right_")
Defining DataPointCompositeObject explicitly as copiable.
static char const * combinationRuleAsString(BlockCombinationRule ruleType)