11#ifndef o2_framework_AnalysisHelpers_H_DEFINED
12#define o2_framework_AnalysisHelpers_H_DEFINED
36 std::shared_ptr<arrow::DataType>
type = [](
IndexKind kind) -> std::shared_ptr<arrow::DataType> {
40 return arrow::int32();
42 return arrow::fixed_size_list(arrow::int32(), 2);
44 return arrow::list(arrow::int32());
52 return (this->label ==
other.label) && (this->columnLabel ==
other.columnLabel) && (this->kind ==
other.kind) && (this->pos ==
other.pos);
55 std::shared_ptr<arrow::Field>
field()
const
62 static std::vector<framework::IndexColumnBuilder>
makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord>
const&
records);
63 static void resetBuilders(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables);
65 static std::shared_ptr<arrow::Table>
materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord>
const&
records, std::shared_ptr<arrow::Schema>
const&
schema,
bool exclusive);
73template <soa::is_table T>
80template <soa::TableRef R>
87template <
typename... Cs>
94template <aod::is_aod_hash D>
101std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
size_t nColumns,
104std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
105 const char*
name,
size_t nColumns,
106 const std::shared_ptr<gandiva::Projector>& projector);
109template <aod::is_aod_hash D>
110 requires(soa::has_extension<typename o2::aod::MetadataTrait<D>::metadata>)
113 if (fullTable->num_rows() == 0) {
114 return makeEmptyTable<D>(
name);
116 constexpr auto Ncol = []<
typename M>() {
126template <
typename... C>
129 std::array<const char*, 1>
labels{
"original"};
131 if (fullTable->num_rows() == 0) {
146 std::shared_ptr<gandiva::Projector>
projector =
nullptr;
147 std::shared_ptr<arrow::Schema>
schema =
nullptr;
166 std::shared_ptr<std::vector<framework::IndexColumnBuilder>>
builders =
nullptr;
178 std::string{
"input:"} + o2::aod::label<R>(),
180 aod::sourceSpec<R>(),
188 std::string{
"input-schema:"} + o2::aod::label<R>(),
196template <soa::with_sources T>
197inline constexpr auto getSources()
199 return []<
size_t N, std::array<soa::TableRef, N> refs>() {
200 return []<
size_t... Is>(std::index_sequence<Is...>) {
201 return std::vector{soa::tableRef2ConfigParamSpec<refs[Is]>()...};
202 }(std::make_index_sequence<N>());
203 }.template operator()<T::sources.size(), T::sources>();
206template <soa::with_sources T>
207inline constexpr auto getSourceSchemas()
209 return []<
size_t N, std::array<soa::TableRef, N> refs>() {
210 return []<
size_t... Is>(std::index_sequence<Is...>) {
211 return std::vector{soa::tableRef2Schema<refs[Is]>()...};
212 }(std::make_index_sequence<N>());
213 }.template operator()<T::sources.size(), T::sources>();
216template <soa::with_ccdb_urls T>
217inline constexpr auto getCCDBUrls()
219 std::vector<framework::ConfigParamSpec>
result;
220 for (
size_t i = 0;
i < T::ccdb_urls.size(); ++
i) {
221 result.push_back({std::string{
"ccdb:"} + std::string{T::ccdb_bindings[
i]},
230 requires(std::same_as<T, int>)
237 requires(std::is_bounded_array_v<T>)
244 requires(framework::is_specialization_v<T, std::vector>)
250template <soa::with_index_pack T>
251inline constexpr auto getIndexMapping()
253 std::vector<IndexRecord>
idx;
254 using indices = T::index_pack_t;
256 [&
idx]<
size_t... Is>(std::index_sequence<Is...>)
mutable {
257 constexpr auto refs = T::sources;
258 ([&
idx]<TableRef
ref,
typename C>()
mutable {
260 if constexpr (
pos == -1) {
263 idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), getIndexKind<typename C::type>(),
pos);
265 }.template operator()<refs[Is],
typename framework::pack_element_t<Is, indices>>(),
272template <soa::with_sources T>
273constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
275 std::vector<framework::ConfigParamSpec> inputMetadata;
277 auto inputSources = getSources<T>();
278 std::sort(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec
const&
a, framework::ConfigParamSpec
const&
b) { return a.name < b.name; });
279 auto last = std::unique(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec
const&
a, framework::ConfigParamSpec
const&
b) { return a.name == b.name; });
280 inputSources.erase(last, inputSources.end());
281 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
283 auto inputSchemas = getSourceSchemas<T>();
284 std::sort(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec
const&
a, framework::ConfigParamSpec
const&
b) { return a.name < b.name; });
285 last = std::unique(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec
const&
a, framework::ConfigParamSpec
const&
b) { return a.name == b.name; });
286 inputSchemas.erase(last, inputSchemas.end());
287 inputMetadata.insert(inputMetadata.end(), inputSchemas.begin(), inputSchemas.end());
289 return inputMetadata;
293 requires(!soa::with_sources<T>)
294constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
299template <soa::with_ccdb_urls T>
300constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
302 std::vector<framework::ConfigParamSpec> results = getCCDBUrls<T>();
303 std::sort(results.begin(), results.end(), [](framework::ConfigParamSpec
const&
a, framework::ConfigParamSpec
const&
b) { return a.name < b.name; });
304 auto last = std::unique(results.begin(), results.end(), [](framework::ConfigParamSpec
const&
a, framework::ConfigParamSpec
const&
b) { return a.name == b.name; });
305 results.erase(last, results.end());
310constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
315template <soa::with_expression_pack T>
316constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec>
318 using expression_pack_t = T::expression_pack_t;
320 auto projectors = []<
typename...
C>(framework::pack<
C...>) -> std::vector<framework::expressions::Projector> {
321 std::vector<framework::expressions::Projector>
result;
322 (
result.emplace_back(std::move(C::Projector())), ...);
324 }(expression_pack_t{});
331 requires(!soa::with_expression_pack<T>)
332constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec>
337template <soa::with_index_pack T>
338constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
340 auto map = getIndexMapping<T>();
346 requires(!soa::with_index_pack<T>)
347constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
357 std::vector<framework::ConfigParamSpec> metadata;
359 metadata.insert(metadata.end(),
m.begin(),
m.end());
361 metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
363 metadata.insert(metadata.end(), p.begin(), p.end());
365 metadata.insert(metadata.end(), idx.begin(), idx.end());
372 o2::aod::origin<R>(),
373 o2::aod::description(o2::aod::signature<R>()),
375 framework::Lifetime::Timeframe,
384 o2::aod::origin<R>(),
385 o2::aod::description(o2::aod::signature<R>()),
393 o2::aod::origin<R>(),
394 o2::aod::description(o2::aod::signature<R>()),
420template <is_producable T>
426 template <
typename...
Ts>
428 requires(
sizeof...(Ts) ==
framework::pack_size(
typename persistent_table_t::persistent_columns_t{}))
431 cursor(0, extract(args)...);
442 mBuilder = std::move(builder);
450 mBuilder->setLabel(
label);
457 mBuilder->reserve(
typename persistent_table_t::column_types{},
size);
470 return arg.globalIndex();
473 template <
typename A>
475 static decltype(
auto) extract(
A&& arg)
488template <soa::is_table T>
493template <soa::is_iterator T>
494consteval auto typeWithRef() ->
typename T::parent_t
499 requires soa::is_table<T> || soa::is_iterator<T>
506 return OutputSpec{
OutputLabel{aod::label<table_t::ref>()}, o2::aod::origin<table_t::ref>(), o2::aod::description(o2::aod::signature<table_t::ref>()), table_t::ref.version};
511 return OutputRef{aod::label<table_t::ref>(), table_t::ref.version};
519template <is_producable T>
524concept is_produces =
requires(T t) {
typename T::cursor_t;
typename T::persistent_table_t; &T::cursor; };
541template <soa::is_metadata M, soa::TableRef Ref>
546 template <soa::TableRef R>
549 return soa::tableRef2InputSpec<R>();
554 return []<
size_t... Is>(std::index_sequence<Is...>) {
555 return std::array{base_spec<sources[Is]>()...};
556 }(std::make_index_sequence<
sources.size()>{});
561 return soa::tableRef2OutputSpec<Ref>();
566 return soa::tableRef2Output<Ref>();
569 static constexpr auto ref()
571 return soa::tableRef2OutputRef<Ref>();
583template <is_spawnable T>
590template <is_spawnable T>
591struct Spawns : decltype(transformBase<T>()) {
612 std::shared_ptr<typename T::table_t>
table =
nullptr;
616 return {{std::move(C::Projector())...}};
619 std::shared_ptr<gandiva::Projector>
projector =
nullptr;
620 std::shared_ptr<arrow::Schema>
schema = []() {
622 s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
629 typename T::metadata;
630 typename T::expression_pack_t;
631 requires std::same_as<
decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
639template <is_dynamically_spawnable T,
bool DELAYED = false>
640struct Defines : decltype(transformBase<T>()) {
661 std::shared_ptr<typename T::table_t>
table =
nullptr;
664 std::array<o2::framework::expressions::Projector, N>
projectors;
665 std::shared_ptr<gandiva::Projector>
projector =
nullptr;
666 std::shared_ptr<arrow::Schema>
schema = []() {
668 s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
681template <is_dynamically_spawnable T>
686 typename T::metadata;
687 typename T::placeholders_pack_t;
688 requires std::same_as<
decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
689 requires std::same_as<
decltype(t.needRecompilation),
bool>;
704template <soa::is_index_table T>
711template <soa::is_index_table T>
712struct Builds : decltype(transformBase<T>()) {
715 using Key = metadata::Key;
716 using H =
typename T::first_t;
717 using Ts =
typename T::rest_t;
720 std::shared_ptr<arrow::Schema>
outputSchema = []() {
return std::make_shared<arrow::Schema>(
soa::createFieldsFromColumns(
index_pack_t{}))->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}})); }();
722 std::vector<soa::IndexRecord>
map = soa::getIndexMapping<metadata>();
724 std::vector<framework::IndexColumnBuilder>
builders;
737 return table->asArrowTable();
746 auto build(std::vector<std::shared_ptr<arrow::Table>>&& tables)
749 return (this->table !=
nullptr);
755 typename T::metadata;
757 requires std::same_as<
decltype(t.map), std::vector<soa::IndexRecord>>;
791 object = std::make_shared<T>(t);
792 object->SetName(
label.c_str());
797 object = std::make_shared<T>(t);
798 object->SetName(
label.c_str());
804 object->SetName(
label.c_str());
810 object->SetName(
label.c_str());
823 std::memset(desc.str,
'_', 16);
825 s << std::hex << lhash;
827 s << std::hex << reinterpret_cast<uint64_t>(
this);
828 std::memcpy(desc.str, s.str().c_str(), 12);
839 return *
object.get();
860 requires std::same_as<
decltype(t.operator->()),
typename T::obj_t*>;
861 requires std::same_as<
decltype(t.object), std::shared_ptr<typename T::obj_t>>;
872 decltype(
auto) operator->()
const
884 requires std::same_as<
decltype(t.service),
typename T::service_t*>;
890 return std::make_unique<
o2::soa::Filtered<std::decay_t<
decltype(table)>>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
895 return std::make_unique<
o2::soa::Filtered<std::decay_t<
decltype(table)>>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
927 template <
typename...
Ts>
931 mFiltered->bindExternalIndices(tables...);
935 template <
typename E>
958 template <
typename T1>
974 template <
typename T1,
typename Policy,
bool OPT>
981 std::unique_ptr<o2::soa::Filtered<T>>
mFiltered =
nullptr;
982 gandiva::NodePtr
tree =
nullptr;
1013template <
typename T>
1015 &T::updatePlaceholders;
1016 requires std::same_as<
decltype(t.filter), expressions::Filter>;
1029 static std::shared_ptr<gandiva::Projector> projector =
nullptr;
1040 return output_t{{table.asArrowTable()}, table.offset()};
std::vector< o2::soa::IndexRecord > records
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::vector< std::string > labels
o2::monitoring::tags::Key Key
std::shared_ptr< arrow::Schema > schema
std::unique_ptr< expressions::Node > node
constexpr uint32_t runtime_hash(char const *str)
T::template iterator_template_o< FilteredIndexPolicy, self_t > iterator
Helper to check if a type T is an iterator.
GLuint const GLchar * name
GLboolean GLboolean GLboolean b
GLsizei const GLfloat * value
GLint GLint GLsizei GLint GLenum GLenum type
GLuint GLsizei const GLchar * label
GLsizei GLenum const void * indices
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
GLboolean GLboolean GLboolean GLboolean a
std::shared_ptr< gandiva::Filter > FilterPtr
std::variant< OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr< DataDescriptorMatcher >, ConstantValueMatcher, StartTimeValueMatcher > Node
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
gandiva::Selection createSelection(std::shared_ptr< arrow::Table > const &table, Filter const &expression)
Function for creating gandiva selection from our internal filter tree.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
constexpr std::size_t pack_size(pack< Ts... > const &)
template function to determine number of types in a pack
constexpr auto transformBase()
This helper struct allows you to declare index tables to be created in a task.
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
memfun_type< decltype(&F::operator())>::type FFL(F const &func)
auto getTableFromFilter(soa::is_filtered_table auto const &table, soa::SelectionVector &&selection)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
consteval auto typeWithRef() -> T
Helper to define output for a Table.
OutputObjHandlingPolicy
Policy enum to determine OutputObj handling when writing.
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
auto createFieldsFromColumns(framework::pack< C... >)
SelectionVector selectionToVector(gandiva::Selection const &sel)
constexpr auto tableRef2InputSpec()
constexpr auto tableRef2Output()
std::vector< int64_t > SelectionVector
auto Attach(T const &table)
constexpr auto tableRef2Schema()
constexpr auto tableRef2ConfigParamSpec()
auto Extend(T const &table)
On-the-fly adding of expression columns.
constexpr auto tableRef2OutputSpec()
constexpr auto tableRef2OutputRef()
Defining DataPointCompositeObject explicitly as copiable.
header::DataDescription description
std::vector< o2::soa::IndexRecord > records
std::vector< std::string > labels
header::DataOrigin origin
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
header::DataHeader::SubSpecificationType version
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
std::shared_ptr< T > table
auto build(std::vector< std::shared_ptr< arrow::Table > > &&tables)
std::vector< soa::IndexRecord > map
std::vector< framework::IndexColumnBuilder > builders
std::shared_ptr< arrow::Schema > outputSchema
T const & operator*() const
metadata::index_pack_t index_pack_t
decltype(transformBase< T >())::metadata metadata
std::shared_ptr< typename T::table_t > table
T::table_t const & operator*() const
decltype(transformBase< T >())::metadata metadata
typename metadata::extension_table_t extension_t
std::array< o2::framework::expressions::Projector, N > projectors
std::shared_ptr< extension_t > extension
std::shared_ptr< arrow::Schema > inputSchema
std::shared_ptr< arrow::Schema > schema
static constexpr bool delayed
static constexpr size_t N
typename metadata::placeholders_pack_t placeholders_pack_t
T::table_t * operator->()
std::shared_ptr< gandiva::Projector > projector
static OutputSpec const spec()
aod::MetadataTrait< o2::aod::Hash< table_t::ref.desc_hash > >::metadata metadata
decltype(typeWithRef< T >()) table_t
OutputObj(T &&t, OutputObjHandlingPolicy policy_=OutputObjHandlingPolicy::AnalysisObject, OutputObjSourceType sourceType_=OutputObjSourceType::OutputObjSource)
void setObject(std::shared_ptr< T > t)
OutputObjSourceType sourceType
OutputRef ref(uint16_t index, uint16_t max)
std::shared_ptr< T > object
void setObject(T const &t)
OutputObj(std::string const &label_, OutputObjHandlingPolicy policy_=OutputObjHandlingPolicy::AnalysisObject, OutputObjSourceType sourceType_=OutputObjSourceType::OutputObjSource)
OutputObjHandlingPolicy policy
void setHash(uint32_t hash)
Partition(expressions::Node &&filter_, T const &table)
typename o2::soa::Filtered< T >::const_iterator filtered_const_iterator
void bindTable(T const &table)
filtered_const_iterator begin() const
auto rawSliceBy(o2::framework::Preslice< T1 > const &container, int value) const
auto sliceByCached(framework::expressions::BindingNode const &node, int value, o2::framework::SliceCache &cache) const
void intializeCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema)
Partition(expressions::Node &&filter_)
typename o2::soa::Filtered< T >::iterator filtered_iterator
o2::soa::Filtered< T > * operator->()
typename o2::soa::Filtered< T >::iterator iterator
auto sliceBy(o2::framework::PresliceBase< T1, Policy, OPT > const &container, int value) const
std::shared_ptr< arrow::Table > asArrowTable() const
o2::soa::RowViewSentinel end() const
void updatePlaceholders(InitContext &context)
std::unique_ptr< o2::soa::Filtered< T > > mFiltered
void bindInternalIndicesTo(E *ptr)
expressions::Filter filter
o2::soa::RowViewSentinel end()
auto sliceByCachedUnsorted(framework::expressions::BindingNode const &node, int value, o2::framework::SliceCache &cache) const
typename o2::soa::Filtered< T >::const_iterator const_iterator
gandiva::FilterPtr gfilter
void bindExternalIndices(Ts *... tables)
filtered_iterator begin()
header::DataDescription description
std::vector< std::string > labels
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< gandiva::Projector > projector
header::DataOrigin origin
header::DataHeader::SubSpecificationType version
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
std::shared_ptr< arrow::Schema > inputSchema
T::table_t * operator->()
std::shared_ptr< gandiva::Projector > projector
std::shared_ptr< extension_t > extension
T::table_t const & operator*() const
static constexpr size_t N
std::shared_ptr< arrow::Schema > schema
typename metadata::expression_pack_t expression_pack_t
std::array< o2::framework::expressions::Projector, N > projectors
decltype(transformBase< T >())::metadata metadata
std::shared_ptr< typename T::table_t > table
typename metadata::extension_table_t extension_t
int64_t lastIndex()
Last index inserted in the table.
decltype(FFL(std::declval< cursor_t >())) cursor
bool resetCursor(LifetimeHolder< TableBuilder > builder)
void setLabel(const char *label)
void operator()(Ts &&... args)
void reserve(int64_t size)
decltype([]() { if constexpr(soa::is_iterator< T >) { return typename T::parent_t{nullptr} persistent_table_t
decltype(std::declval< TableBuilder >().cursor< persistent_table_t >()) cursor_t
An expression tree node corresponding to a column binding.
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::shared_ptr< arrow::Field > field() const
auto operator==(IndexRecord const &other) const
VectorOfTObjectPtrs other
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))