Project
Loading...
Searching...
No Matches
AnalysisCCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "AnalysisCCDBHelpers.h"
13#include "CCDBFetcherHelper.h"
20#include "Framework/Output.h"
21#include "Framework/Signpost.h"
25#include <arrow/array/builder_binary.h>
26#include <arrow/type.h>
27#include <arrow/type_fwd.h>
28#include <arrow/util/key_value_metadata.h>
29#include <arrow/table.h>
30#include <arrow/array.h>
31#include <arrow/builder.h>
32#include <fmt/base.h>
33#include <ctime>
34#include <memory>
35#include <unordered_map>
36
38
39namespace o2::framework
40{
41// Fill valid routes. Notice that for analysis the timestamps are associated to
42// a ATIM table and there might be multiple CCDB objects of the same kind for
43// dataframe.
44// For this reason rather than matching the Lifetime::Condition, we match the
45// origin.
46namespace
47{
48void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes, std::unordered_map<std::string, int>& bindings)
49{
50 for (auto& route : outputRoutes) {
51 if (std::ranges::none_of(route.matcher.metadata, [](auto const& m) { return m.name.starts_with("ccdb:"); })) {
52 continue;
53 }
54 auto specStr = DataSpecUtils::describe(route.matcher);
55 if (bindings.find(specStr) != bindings.end()) {
56 continue;
57 }
58 bindings[specStr] = helper.routes.size();
59 helper.routes.push_back(route);
60 LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher));
61 for (auto& metadata : route.matcher.metadata) {
62 if (metadata.type == VariantType::String) {
63 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
64 }
65 }
66 }
67}
68} // namespace
69
71{
72 return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
73 auto& dec = ic.services().get<DanglingEdgesContext>();
74 // The effective default for each ccdb: option was already resolved at topology
75 // time by ArrowSupport (consulting task Configurables) and registered on this
76 // device's options. Here we just read the final value — honouring any further
77 // runtime override supplied via CLI or JSON config.
78 std::unordered_map<std::string, std::string> ccdbUrls;
79 for (auto& input : dec.analysisCCDBInputs) {
80 for (auto& m : input.metadata) {
81 if (!m.name.starts_with("ccdb:") || ccdbUrls.count(m.name)) {
82 continue;
83 }
84 std::string url = m.defaultValue.asString();
85 if (ConfigParamsHelper::hasOption(spec.options, m.name)) {
86 url = options.get<std::string>(m.name.c_str());
87 }
88 LOGP(info, "CCDB path resolved for {}: {}", m.name, url);
89 ccdbUrls.emplace(m.name, std::move(url));
90 }
91 }
92 std::vector<std::shared_ptr<arrow::Schema>> schemas;
93 for (auto& input : dec.analysisCCDBInputs) {
94 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
95 std::vector<std::shared_ptr<arrow::Field>> fields;
96 schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
97 schemaMetadata->Append("outputBinding", input.binding);
98 for (auto& m : input.metadata) {
99 if (m.name.starts_with("input:")) {
100 auto name = m.name.substr(6);
101 schemaMetadata->Append("sourceTable", name);
102 schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
103 continue;
104 }
105 if (!m.name.starts_with("ccdb:")) {
106 continue;
107 }
108 auto fieldMetadata = std::make_shared<arrow::KeyValueMetadata>();
109 auto it = ccdbUrls.find(m.name);
110 fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString());
111 auto columnName = m.name.substr(strlen("ccdb:"));
112 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, fieldMetadata));
113 }
114 schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
115 }
116
117 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
118 CCDBFetcherHelper::initialiseHelper(*helper, options);
119 std::unordered_map<std::string, int> bindings;
120 fillValidRoutes(*helper, spec.outputs, bindings);
121
122 return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
123 O2_SIGNPOST_ID_GENERATE(sid, ccdb);
124 O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
125 for (auto& schema : schemas) {
126 std::vector<CCDBFetcherHelper::FetchOp> ops;
127 auto inputBinding = *schema->metadata()->Get("sourceTable");
128 auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
129 auto outRouteDesc = *schema->metadata()->Get("outputRoute");
130 std::string outBinding = *schema->metadata()->Get("outputBinding");
131 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
132 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
133 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
134 auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
135 // FIXME: make the fTimestamp column configurable.
136 auto timestampColumn = table->GetColumnByName("fTimestamp");
137 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
138 "There are %zu bindings available", bindings.size());
139 for (auto& binding : bindings) {
140 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
141 "* %{public}s: %d",
142 binding.first.c_str(), binding.second);
143 }
144 int outputRouteIndex = bindings.at(outRouteDesc);
145 auto& spec = helper->routes[outputRouteIndex].matcher;
146 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
147 for (auto const& _ : schema->fields()) {
148 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
149 }
150
151 for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
152 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
153 auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
154
155 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
156 ops.clear();
157 int64_t timestamp = timestamps[ri];
158 for (auto& field : schema->fields()) {
159 auto url = *field->metadata()->Get("url");
160 // Time to actually populate the blob
161 ops.push_back({
162 .spec = spec,
163 .url = url,
164 .timestamp = timestamp,
165 .runNumber = 1,
166 .runDependent = 0,
167 .queryRate = 0,
168 });
169 }
170 auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
171 O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
172 "Got %zu responses from server.",
173 responses.size());
174 if (builders.size() != responses.size()) {
175 LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
176 }
177 arrow::Status result;
178 for (size_t bi = 0; bi < responses.size(); bi++) {
179 auto& builder = builders[bi];
180 auto& response = responses[bi];
181 char const* address = reinterpret_cast<char const*>(response.id.value);
182 result &= builder->Append(std::string_view(address, response.size));
183 }
184 if (!result.ok()) {
185 LOGP(fatal, "Error adding results from CCDB");
186 }
187 O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses");
188 }
189 }
190 arrow::ArrayVector arrays;
191 for (auto& builder : builders) {
192 arrays.push_back(*builder->Finish());
193 }
194 auto outTable = arrow::Table::Make(schema, arrays);
195 auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
196 allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
197 }
198
199 stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes});
200 stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes});
201 O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
202 });
203 });
204}
205
206} // namespace o2::framework
std::string binding
std::string url
std::shared_ptr< arrow::Schema > schema
std::vector< std::shared_ptr< arrow::Field > > fields
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
void adopt(const Output &spec, std::string *)
ServiceRegistryRef services()
Definition InitContext.h:34
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
const GLfloat * m
Definition glcorearb.h:4066
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
const GLuint * arrays
Definition glcorearb.h:1314
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
static AlgorithmSpec fetchFromCCDB(ConfigContext const &)
static auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, std::vector< FetchOp > const &ops, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> std::vector< Response >
static void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options)
static bool hasOption(const std::vector< ConfigParamSpec > &specs, const std::string &optName)
Check if option is defined.
Helper struct to hold statistics about the data processing happening.
static InputSpec fromMetadataString(std::string s)
Create an InputSpec from metadata string.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static ConcreteDataMatcher fromString(std::string s)
Create a concrete data matcher from serialized string.
std::vector< ConfigParamSpec > options
Definition DeviceSpec.h:57
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
std::variant< ConcreteDataMatcher, data_matcher::DataDescriptorMatcher > matcher
The actual matcher for the input spec.
Definition InputSpec.h:70
header::DataOrigin origin
Definition Output.h:28