Project
Loading...
Searching...
No Matches
AnalysisCCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "AnalysisCCDBHelpers.h"
13#include "CCDBFetcherHelper.h"
19#include "Framework/Output.h"
20#include "Framework/Signpost.h"
24#include <arrow/array/builder_binary.h>
25#include <arrow/type.h>
26#include <arrow/type_fwd.h>
27#include <arrow/util/key_value_metadata.h>
28#include <arrow/table.h>
29#include <arrow/array.h>
30#include <arrow/builder.h>
31#include <fmt/base.h>
32#include <ctime>
33#include <memory>
34#include <unordered_map>
35
37
38namespace o2::framework
39{
40// Fill valid routes. Notice that for analysis the timestamps are associated to
41// a ATIM table and there might be multiple CCDB objects of the same kind for
42// dataframe.
43// For this reason rather than matching the Lifetime::Condition, we match the
44// origin.
45namespace
46{
47void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute> const& outputRoutes, std::unordered_map<std::string, int>& bindings)
48{
49 for (auto& route : outputRoutes) {
50 auto originMatcher = DataSpecUtils::asConcreteDataMatcher(route.matcher);
51 if (originMatcher.origin != header::DataOrigin{"ATIM"}) {
52 continue;
53 }
54 auto specStr = DataSpecUtils::describe(route.matcher);
55 if (bindings.find(specStr) != bindings.end()) {
56 continue;
57 }
58 bindings[specStr] = helper.routes.size();
59 helper.routes.push_back(route);
60 LOGP(info, "The following route needs condition objects {} ", DataSpecUtils::describe(route.matcher));
61 for (auto& metadata : route.matcher.metadata) {
62 if (metadata.type == VariantType::String) {
63 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
64 }
65 }
66 }
67}
68} // namespace
69
71{
72 auto& ac = ctx.services().get<AnalysisContext>();
73 std::vector<std::shared_ptr<arrow::Schema>> schemas;
74 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
75
76 for (auto& input : ac.analysisCCDBInputs) {
77 std::vector<std::shared_ptr<arrow::Field>> fields;
78 schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
79 schemaMetadata->Append("outputBinding", input.binding);
80
81 for (auto& m : input.metadata) {
82 // Save the list of input tables
83 if (m.name.starts_with("input:")) {
84 auto name = m.name.substr(6);
85 schemaMetadata->Append("sourceTable", name);
86 continue;
87 }
88 // Ignore the non ccdb: entries
89 if (!m.name.starts_with("ccdb:")) {
90 continue;
91 }
92 // Create the schema of the output
93 auto metadata = std::make_shared<arrow::KeyValueMetadata>();
94 metadata->Append("url", m.defaultValue.asString());
95 auto columnName = m.name.substr(strlen("ccdb:"));
96 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
97 }
98 schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
99 }
100 return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
101 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
102 CCDBFetcherHelper::initialiseHelper(*helper, options);
103 std::unordered_map<std::string, int> bindings;
104 fillValidRoutes(*helper, spec.outputs, bindings);
105
106 return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
107 O2_SIGNPOST_ID_GENERATE(sid, ccdb);
108 O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
109 for (auto& schema : schemas) {
110 std::vector<CCDBFetcherHelper::FetchOp> ops;
111 auto inputBinding = *schema->metadata()->Get("sourceTable");
112 auto outRouteDesc = *schema->metadata()->Get("outputRoute");
113 std::string outBinding = *schema->metadata()->Get("outputBinding");
114 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
115 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
116 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
117 auto ref = inputs.get<TableConsumer>(inputBinding);
118 auto table = ref->asArrowTable();
119 // FIXME: make the fTimestamp column configurable.
120 auto timestampColumn = table->GetColumnByName("fTimestamp");
121 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
122 "There are %zu bindings available", bindings.size());
123 for (auto& binding : bindings) {
124 O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
125 "* %{public}s: %d",
126 binding.first.c_str(), binding.second);
127 }
128 int outputRouteIndex = bindings.at(outRouteDesc);
129 auto& spec = helper->routes[outputRouteIndex].matcher;
130 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
131 for (auto& _ : schema->fields()) {
132 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
133 }
134
135 for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
136 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
137 auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
138
139 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
140 ops.clear();
141 int64_t timestamp = timestamps[ri];
142 for (auto& field : schema->fields()) {
143 auto url = *field->metadata()->Get("url");
144 // Time to actually populate the blob
145 ops.push_back({
146 .spec = spec,
147 .url = url,
148 .timestamp = timestamp,
149 .runNumber = 1,
150 .runDependent = 0,
151 .queryRate = 0,
152 });
153 }
154 auto responses = CCDBFetcherHelper::populateCacheWith(helper, ops, timingInfo, dtc, allocator);
155 O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
156 "Got %zu responses from server.",
157 responses.size());
158 if (builders.size() != responses.size()) {
159 LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
160 }
161 arrow::Status result;
162 for (size_t bi = 0; bi < responses.size(); bi++) {
163 auto& builder = builders[bi];
164 auto& response = responses[bi];
165 char const* address = reinterpret_cast<char const*>(response.id.value);
166 result &= builder->Append(std::string_view(address, response.size));
167 }
168 if (!result.ok()) {
169 LOGP(fatal, "Error adding results from CCDB");
170 }
171 O2_SIGNPOST_END(ccdb, sid, "handlingResponses", "Done processing responses");
172 }
173 }
174 arrow::ArrayVector arrays;
175 for (auto& builder : builders) {
176 arrays.push_back(*builder->Finish());
177 }
178 auto outTable = arrow::Table::Make(schema, arrays);
179 auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
180 allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
181 }
182
183 O2_SIGNPOST_END(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects");
184 });
185 });
186}
187
188} // namespace o2::framework
std::string url
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
ServiceRegistryRef services() const
void adopt(const Output &spec, std::string *)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
const GLfloat * m
Definition glcorearb.h:4066
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
const GLuint * arrays
Definition glcorearb.h:1314
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
static AlgorithmSpec fetchFromCCDB(ConfigContext const &ctx)
static auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, std::vector< FetchOp > const &ops, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> std::vector< Response >
static void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options)
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
header::DataOrigin origin
Definition Output.h:28