Project
Loading...
Searching...
No Matches
AnalysisCCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "AnalysisCCDBHelpers.h"
13#include "CCDBFetcherHelper.h"
19#include "Framework/Output.h"
20#include "Framework/Signpost.h"
24#include <arrow/array/builder_binary.h>
25#include <arrow/type.h>
26#include <arrow/type_fwd.h>
27#include <arrow/util/key_value_metadata.h>
28#include <arrow/table.h>
29#include <arrow/array.h>
30#include <arrow/builder.h>
31#include <fmt/base.h>
32#include <ctime>
33#include <memory>
34#include <unordered_map>
35
37
38namespace o2::framework
39{
40// Fill valid routes. Notice that for analysis the timestamps are associated to
41// a ATIM table and there might be multiple CCDB objects of the same kind for
42// dataframe.
43// For this reason rather than matching the Lifetime::Condition, we match the
44// origin.
45namespace
46{
47void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes, std::unordered_map<std::string, int>& bindings)
48{
49 for (auto& route : outputRoutes) {
50 auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher);
51 if (originMatcher.origin != header::DataOrigin{"ATIM"}) {
52 continue;
53 }
54 auto specStr = DataSpecUtils::describe(route.matcher);
55 if (bindings.find(specStr) != bindings.end()) {
56 continue;
57 }
58 bindings[specStr] = helper.routes.size();
59 helper.routes.push_back(route);
60 LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher));
61 for (auto& metadata : route.matcher.metadata) {
62 if (metadata.type == VariantType::String) {
63 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
64 }
65 }
66 }
67}
68} // namespace
69
71{
72 return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
73 auto& dec = ic.services().get<DanglingEdgesContext>();
74 std::vector<std::shared_ptr<arrow::Schema>> schemas;
75 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
76
77 for (auto& input : dec.analysisCCDBInputs) {
78 std::vector<std::shared_ptr<arrow::Field>> fields;
79 schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
80 schemaMetadata->Append("outputBinding", input.binding);
81
82 for (auto& m : input.metadata) {
83 // Save the list of input tables
84 if (m.name.starts_with("input:")) {
85 auto name = m.name.substr(6);
86 schemaMetadata->Append("sourceTable", name);
87 continue;
88 }
89 // Ignore the non ccdb: entries
90 if (!m.name.starts_with("ccdb:")) {
91 continue;
92 }
93 // Create the schema of the output
94 auto metadata = std::make_shared<arrow::KeyValueMetadata>();
95 metadata->Append("url", m.defaultValue.asString());
96 auto columnName = m.name.substr(strlen("ccdb:"));
97 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
98 }
99 schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
100 }
101
102 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
103 CCDBFetcherHelper::initialiseHelper(*helper, options);
104 std::unordered_map<std::string, int> bindings;
105 fillValidRoutes(*helper, spec.outputs, bindings);
106
107 return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
108 O2_SIGNPOST_ID_GENERATE(sid, ccdb);
109 O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
110 for (auto& schema : schemas) {
111 std::vector<CCDBFetcherHelper::FetchOp> ops;
112 auto inputBinding = *schema->metadata()->Get("sourceTable");
113 auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
114 auto outRouteDesc = *schema->metadata()->Get("outputRoute");
115 std::string outBinding = *schema->metadata()->Get("outputBinding");
116 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
117 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
118 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
119 auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
120 // FIXME: make the fTimestamp column configurable.
121 auto timestampColumn = table->GetColumnByName("fTimestamp");
122 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
123 "There are %zu bindings available", bindings.size());
124 for (auto& binding : bindings) {
125 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
126 "* %{public}s: %d",
127 binding.first.c_str(), binding.second);
128 }
129 int outputRouteIndex = bindings.at(outRouteDesc);
130 auto& spec = helper->routes[outputRouteIndex].matcher;
131 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
132 for (auto const& _ : schema->fields()) {
133 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
134 }
135
136 for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
137 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
138 auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
139
140 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
141 ops.clear();
142 int64_t timestamp = timestamps[ri];
143 for (auto& field : schema->fields()) {
144 auto url = *field->metadata()->Get("url");
145 // Time to actually populate the blob
146 ops.push_back({
147 .spec = spec,
148 .url = url,
149 .timestamp = timestamp,
150 .runNumber = 1,
151 .runDependent = 0,
152 .queryRate = 0,
153 });
154 }
155 auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
156 O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
157 "Got %zu responses from server.",
158 responses.size());
159 if (builders.size() != responses.size()) {
160 LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
161 }
162 arrow::Status result;
163 for (size_t bi = 0; bi < responses.size(); bi++) {
164 auto& builder = builders[bi];
165 auto& response = responses[bi];
166 char const* address = reinterpret_cast<char const*>(response.id.value);
167 result &= builder->Append(std::string_view(address, response.size));
168 }
169 if (!result.ok()) {
170 LOGP(fatal, "Error adding results from CCDB");
171 }
172 O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses");
173 }
174 }
175 arrow::ArrayVector arrays;
176 for (auto& builder : builders) {
177 arrays.push_back(*builder->Finish());
178 }
179 auto outTable = arrow::Table::Make(schema, arrays);
180 auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
181 allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
182 }
183
184 O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
185 });
186 });
187}
188
189} // namespace o2::framework
std::string binding
std::string url
std::shared_ptr< arrow::Schema > schema
std::vector< std::shared_ptr< arrow::Field > > fields
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
void adopt(const Output &spec, std::string *)
ServiceRegistryRef services()
Definition InitContext.h:34
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
const GLfloat * m
Definition glcorearb.h:4066
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
const GLuint * arrays
Definition glcorearb.h:1314
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining PrimaryVertex explicitly as messageable.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
static AlgorithmSpec fetchFromCCDB(ConfigContext const &)
static auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, std::vector< FetchOp > const &ops, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> std::vector< Response >
static void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options)
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static ConcreteDataMatcher fromString(std::string s)
Create a concrete data matcher from serialized string.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
header::DataOrigin origin
Definition Output.h:28