48void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute>
const& outputRoutes, std::unordered_map<std::string, int>& bindings)
73 auto& dec = ic.
services().get<DanglingEdgesContext>();
78 std::unordered_map<std::string, std::string> ccdbUrls;
79 for (
auto& input : dec.analysisCCDBInputs) {
80 for (
auto&
m : input.metadata) {
81 if (!
m.name.starts_with(
"ccdb:") || ccdbUrls.count(
m.name)) {
84 std::string
url =
m.defaultValue.asString();
86 url = options.
get<std::string>(
m.name.c_str());
88 LOGP(info,
"CCDB path resolved for {}: {}",
m.name,
url);
89 ccdbUrls.emplace(
m.name, std::move(
url));
92 std::vector<std::shared_ptr<arrow::Schema>> schemas;
93 for (
auto& input : dec.analysisCCDBInputs) {
94 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
95 std::vector<std::shared_ptr<arrow::Field>>
fields;
97 schemaMetadata->Append(
"outputBinding", input.binding);
98 for (
auto&
m : input.metadata) {
99 if (
m.name.starts_with(
"input:")) {
100 auto name =
m.name.substr(6);
101 schemaMetadata->Append(
"sourceTable",
name);
105 if (!
m.name.starts_with(
"ccdb:")) {
108 auto fieldMetadata = std::make_shared<arrow::KeyValueMetadata>();
109 auto it = ccdbUrls.find(
m.name);
110 fieldMetadata->Append(
"url", it != ccdbUrls.end() ? it->second :
m.defaultValue.asString());
111 auto columnName =
m.name.substr(strlen(
"ccdb:"));
112 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(),
false, fieldMetadata));
114 schemas.emplace_back(std::make_shared<arrow::Schema>(
fields, schemaMetadata));
117 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
119 std::unordered_map<std::string, int> bindings;
120 fillValidRoutes(*helper, spec.
outputs, bindings);
124 O2_SIGNPOST_START(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.
timeslice);
125 for (
auto&
schema : schemas) {
126 std::vector<CCDBFetcherHelper::FetchOp> ops;
127 auto inputBinding = *
schema->metadata()->Get(
"sourceTable");
129 auto outRouteDesc = *
schema->metadata()->Get(
"outputRoute");
130 std::string outBinding = *
schema->metadata()->Get(
"outputBinding");
132 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
133 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
136 auto timestampColumn = table->GetColumnByName(
"fTimestamp");
138 "There are %zu bindings available", bindings.size());
139 for (
auto&
binding : bindings) {
144 int outputRouteIndex = bindings.at(outRouteDesc);
145 auto& spec = helper->routes[outputRouteIndex].matcher;
146 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
147 for (
auto const& _ :
schema->fields()) {
148 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
151 for (
auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
152 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
153 auto const* timestamps = chunk->data()->GetValuesSafe<
size_t>(1);
155 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
157 int64_t timestamp = timestamps[ri];
158 for (
auto& field :
schema->fields()) {
159 auto url = *field->metadata()->Get(
"url");
164 .timestamp = timestamp,
172 "Got %zu responses from server.",
174 if (builders.size() != responses.size()) {
175 LOGP(fatal,
"Not enough responses (expected {}, found {})", builders.size(), responses.size());
178 for (
size_t bi = 0; bi < responses.size(); bi++) {
179 auto& builder = builders[bi];
180 auto& response = responses[bi];
181 char const*
address =
reinterpret_cast<char const*
>(response.id.value);
182 result &= builder->Append(std::string_view(
address, response.size));
185 LOGP(fatal,
"Error adding results from CCDB");
187 O2_SIGNPOST_END(ccdb, sid,
"handlingResponses",
"Done processing responses");
190 arrow::ArrayVector
arrays;
191 for (
auto& builder : builders) {
192 arrays.push_back(*builder->Finish());
196 allocator.
adopt(
Output{concrete.
origin, concrete.description, concrete.subSpec}, outTable);
201 O2_SIGNPOST_END(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects");
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_START(log, id, name, format,...)
ServiceRegistryRef services()