47void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute>
const& outputRoutes, std::unordered_map<std::string, int>& bindings)
73 std::vector<std::shared_ptr<arrow::Schema>> schemas;
74 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
76 for (
auto& input : ac.analysisCCDBInputs) {
77 std::vector<std::shared_ptr<arrow::Field>> fields;
79 schemaMetadata->Append(
"outputBinding", input.binding);
81 for (
auto&
m : input.metadata) {
83 if (
m.name.starts_with(
"input:")) {
84 auto name =
m.name.substr(6);
85 schemaMetadata->Append(
"sourceTable",
name);
89 if (!
m.name.starts_with(
"ccdb:")) {
93 auto metadata = std::make_shared<arrow::KeyValueMetadata>();
94 metadata->Append(
"url",
m.defaultValue.asString());
95 auto columnName =
m.name.substr(strlen(
"ccdb:"));
96 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(),
false, metadata));
98 schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
101 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
103 std::unordered_map<std::string, int> bindings;
104 fillValidRoutes(*helper, spec.
outputs, bindings);
108 O2_SIGNPOST_START(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.
timeslice);
109 for (
auto& schema : schemas) {
110 std::vector<CCDBFetcherHelper::FetchOp> ops;
111 auto inputBinding = *schema->metadata()->Get(
"sourceTable");
112 auto outRouteDesc = *schema->metadata()->Get(
"outputRoute");
113 std::string outBinding = *schema->metadata()->Get(
"outputBinding");
115 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
116 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
118 auto table =
ref->asArrowTable();
120 auto timestampColumn = table->GetColumnByName(
"fTimestamp");
122 "There are %zu bindings available", bindings.size());
123 for (
auto& binding : bindings) {
126 binding.first.c_str(), binding.second);
128 int outputRouteIndex = bindings.at(outRouteDesc);
129 auto& spec = helper->routes[outputRouteIndex].matcher;
130 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
131 for (
auto& _ : schema->fields()) {
132 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
135 for (
size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
136 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
137 auto const* timestamps = chunk->data()->GetValuesSafe<
size_t>(1);
139 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
141 int64_t timestamp = timestamps[ri];
142 for (
auto& field : schema->fields()) {
143 auto url = *field->metadata()->Get(
"url");
148 .timestamp = timestamp,
156 "Got %zu responses from server.",
158 if (builders.size() != responses.size()) {
159 LOGP(fatal,
"Not enough responses (expected {}, found {})", builders.size(), responses.size());
162 for (
size_t bi = 0; bi < responses.size(); bi++) {
163 auto& builder = builders[bi];
164 auto& response = responses[bi];
165 char const*
address =
reinterpret_cast<char const*
>(response.id.value);
166 result &= builder->Append(std::string_view(
address, response.size));
169 LOGP(fatal,
"Error adding results from CCDB");
171 O2_SIGNPOST_END(ccdb, sid,
"handlingResponses",
"Done processing responses");
174 arrow::ArrayVector
arrays;
175 for (
auto& builder : builders) {
176 arrays.push_back(*builder->Finish());
178 auto outTable = arrow::Table::Make(schema,
arrays);
180 allocator.
adopt(
Output{concrete.
origin, concrete.description, concrete.subSpec}, outTable);
183 O2_SIGNPOST_END(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects");
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_START(log, id, name, format,...)
ServiceRegistryRef services() const