47void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute>
const& outputRoutes, std::unordered_map<std::string, int>& bindings)
73 auto& dec = ic.
services().get<DanglingEdgesContext>();
74 std::vector<std::shared_ptr<arrow::Schema>> schemas;
75 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
77 for (
auto& input : dec.analysisCCDBInputs) {
78 std::vector<std::shared_ptr<arrow::Field>>
fields;
80 schemaMetadata->Append(
"outputBinding", input.binding);
82 for (
auto&
m : input.metadata) {
84 if (
m.name.starts_with(
"input:")) {
85 auto name =
m.name.substr(6);
86 schemaMetadata->Append(
"sourceTable",
name);
91 if (!
m.name.starts_with(
"ccdb:")) {
95 auto metadata = std::make_shared<arrow::KeyValueMetadata>();
96 metadata->Append(
"url",
m.defaultValue.asString());
97 auto columnName =
m.name.substr(strlen(
"ccdb:"));
98 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(),
false, metadata));
100 schemas.emplace_back(std::make_shared<arrow::Schema>(
fields, schemaMetadata));
103 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
105 std::unordered_map<std::string, int> bindings;
106 fillValidRoutes(*helper, spec.
outputs, bindings);
110 O2_SIGNPOST_START(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.
timeslice);
111 for (
auto&
schema : schemas) {
112 std::vector<CCDBFetcherHelper::FetchOp> ops;
113 auto inputBinding = *
schema->metadata()->Get(
"sourceTable");
115 auto outRouteDesc = *
schema->metadata()->Get(
"outputRoute");
116 std::string outBinding = *
schema->metadata()->Get(
"outputBinding");
118 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
119 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
122 auto timestampColumn = table->GetColumnByName(
"fTimestamp");
124 "There are %zu bindings available", bindings.size());
125 for (
auto&
binding : bindings) {
130 int outputRouteIndex = bindings.at(outRouteDesc);
131 auto& spec = helper->routes[outputRouteIndex].matcher;
132 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
133 for (
auto const& _ :
schema->fields()) {
134 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
137 for (
auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
138 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
139 auto const* timestamps = chunk->data()->GetValuesSafe<
size_t>(1);
141 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
143 int64_t timestamp = timestamps[ri];
144 for (
auto& field :
schema->fields()) {
145 auto url = *field->metadata()->Get(
"url");
150 .timestamp = timestamp,
158 "Got %zu responses from server.",
160 if (builders.size() != responses.size()) {
161 LOGP(fatal,
"Not enough responses (expected {}, found {})", builders.size(), responses.size());
164 for (
size_t bi = 0; bi < responses.size(); bi++) {
165 auto& builder = builders[bi];
166 auto& response = responses[bi];
167 char const*
address =
reinterpret_cast<char const*
>(response.id.value);
168 result &= builder->Append(std::string_view(
address, response.size));
171 LOGP(fatal,
"Error adding results from CCDB");
173 O2_SIGNPOST_END(ccdb, sid,
"handlingResponses",
"Done processing responses");
176 arrow::ArrayVector
arrays;
177 for (
auto& builder : builders) {
178 arrays.push_back(*builder->Finish());
182 allocator.
adopt(
Output{concrete.
origin, concrete.description, concrete.subSpec}, outTable);
185 O2_SIGNPOST_END(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects");
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_START(log, id, name, format,...)
ServiceRegistryRef services()