Project
Loading...
Searching...
No Matches
AnalysisCCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "AnalysisCCDBHelpers.h"
13#include "CCDBFetcherHelper.h"
19#include "Framework/Output.h"
20#include "Framework/Signpost.h"
24#include <arrow/array/builder_binary.h>
25#include <arrow/type.h>
26#include <arrow/type_fwd.h>
27#include <arrow/util/key_value_metadata.h>
28#include <arrow/table.h>
29#include <arrow/array.h>
30#include <arrow/builder.h>
31#include <fmt/base.h>
32#include <ctime>
33#include <memory>
34#include <unordered_map>
35
37
38namespace o2::framework
39{
40// Fill valid routes. Notice that for analysis the timestamps are associated to
41// a ATIM table and there might be multiple CCDB objects of the same kind for
42// dataframe.
43// For this reason rather than matching the Lifetime::Condition, we match the
44// origin.
45namespace
46{
47void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes, std::unordered_map<std::string, int>& bindings)
48{
49 for (auto& route : outputRoutes) {
50 auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher);
51 if (originMatcher.origin != header::DataOrigin{"ATIM"}) {
52 continue;
53 }
54 auto specStr = DataSpecUtils::describe(route.matcher);
55 if (bindings.find(specStr) != bindings.end()) {
56 continue;
57 }
58 bindings[specStr] = helper.routes.size();
59 helper.routes.push_back(route);
60 LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher));
61 for (auto& metadata : route.matcher.metadata) {
62 if (metadata.type == VariantType::String) {
63 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
64 }
65 }
66 }
67}
68} // namespace
69
71{
72 return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
73 auto& dec = ic.services().get<DanglingEdgesContext>();
74 std::vector<std::shared_ptr<arrow::Schema>> schemas;
75 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
76
77 for (auto& input : dec.analysisCCDBInputs) {
78 std::vector<std::shared_ptr<arrow::Field>> fields;
79 schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
80 schemaMetadata->Append("outputBinding", input.binding);
81
82 for (auto& m : input.metadata) {
83 // Save the list of input tables
84 if (m.name.starts_with("input:")) {
85 auto name = m.name.substr(6);
86 schemaMetadata->Append("sourceTable", name);
87 schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
88 continue;
89 }
90 // Ignore the non ccdb: entries
91 if (!m.name.starts_with("ccdb:")) {
92 continue;
93 }
94 // Create the schema of the output
95 auto metadata = std::make_shared<arrow::KeyValueMetadata>();
96 metadata->Append("url", m.defaultValue.asString());
97 auto columnName = m.name.substr(strlen("ccdb:"));
98 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
99 }
100 schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
101 }
102
103 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
104 CCDBFetcherHelper::initialiseHelper(*helper, options);
105 std::unordered_map<std::string, int> bindings;
106 fillValidRoutes(*helper, spec.outputs, bindings);
107
108 return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
109 O2_SIGNPOST_ID_GENERATE(sid, ccdb);
110 O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
111 for (auto& schema : schemas) {
112 std::vector<CCDBFetcherHelper::FetchOp> ops;
113 auto inputBinding = *schema->metadata()->Get("sourceTable");
114 auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
115 auto outRouteDesc = *schema->metadata()->Get("outputRoute");
116 std::string outBinding = *schema->metadata()->Get("outputBinding");
117 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
118 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
119 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
120 auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
121 // FIXME: make the fTimestamp column configurable.
122 auto timestampColumn = table->GetColumnByName("fTimestamp");
123 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
124 "There are %zu bindings available", bindings.size());
125 for (auto& binding : bindings) {
126 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
127 "* %{public}s: %d",
128 binding.first.c_str(), binding.second);
129 }
130 int outputRouteIndex = bindings.at(outRouteDesc);
131 auto& spec = helper->routes[outputRouteIndex].matcher;
132 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
133 for (auto const& _ : schema->fields()) {
134 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
135 }
136
137 for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
138 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
139 auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
140
141 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
142 ops.clear();
143 int64_t timestamp = timestamps[ri];
144 for (auto& field : schema->fields()) {
145 auto url = *field->metadata()->Get("url");
146 // Time to actually populate the blob
147 ops.push_back({
148 .spec = spec,
149 .url = url,
150 .timestamp = timestamp,
151 .runNumber = 1,
152 .runDependent = 0,
153 .queryRate = 0,
154 });
155 }
156 auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
157 O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
158 "Got %zu responses from server.",
159 responses.size());
160 if (builders.size() != responses.size()) {
161 LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
162 }
163 arrow::Status result;
164 for (size_t bi = 0; bi < responses.size(); bi++) {
165 auto& builder = builders[bi];
166 auto& response = responses[bi];
167 char const* address = reinterpret_cast<char const*>(response.id.value);
168 result &= builder->Append(std::string_view(address, response.size));
169 }
170 if (!result.ok()) {
171 LOGP(fatal, "Error adding results from CCDB");
172 }
173 O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses");
174 }
175 }
176 arrow::ArrayVector arrays;
177 for (auto& builder : builders) {
178 arrays.push_back(*builder->Finish());
179 }
180 auto outTable = arrow::Table::Make(schema, arrays);
181 auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
182 allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
183 }
184
185 O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
186 });
187 });
188}
189
190} // namespace o2::framework
std::string binding
std::string url
std::shared_ptr< arrow::Schema > schema
std::vector< std::shared_ptr< arrow::Field > > fields
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
void adopt(const Output &spec, std::string *)
ServiceRegistryRef services()
Definition InitContext.h:34
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
const GLfloat * m
Definition glcorearb.h:4066
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
const GLuint * arrays
Definition glcorearb.h:1314
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
Defining PrimaryVertex explicitly as messageable.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
static AlgorithmSpec fetchFromCCDB(ConfigContext const &)
static auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, std::vector< FetchOp > const &ops, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> std::vector< Response >
static void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options)
static InputSpec fromMetadataString(std::string s)
Create an InputSpec from metadata string.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static ConcreteDataMatcher fromString(std::string s)
Create a concrete data matcher from serialized string.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
std::variant< ConcreteDataMatcher, data_matcher::DataDescriptorMatcher > matcher
The actual matcher for the input spec.
Definition InputSpec.h:70
header::DataOrigin origin
Definition Output.h:28