47void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::OutputRoute>
const& outputRoutes, std::unordered_map<std::string, int>& bindings)
73 auto& dec = ic.
services().get<DanglingEdgesContext>();
74 std::vector<std::shared_ptr<arrow::Schema>> schemas;
75 auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
77 for (
auto& input : dec.analysisCCDBInputs) {
78 std::vector<std::shared_ptr<arrow::Field>>
fields;
80 schemaMetadata->Append(
"outputBinding", input.binding);
82 for (
auto&
m : input.metadata) {
84 if (
m.name.starts_with(
"input:")) {
85 auto name =
m.name.substr(6);
86 schemaMetadata->Append(
"sourceTable",
name);
90 if (!
m.name.starts_with(
"ccdb:")) {
94 auto metadata = std::make_shared<arrow::KeyValueMetadata>();
95 metadata->Append(
"url",
m.defaultValue.asString());
96 auto columnName =
m.name.substr(strlen(
"ccdb:"));
97 fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(),
false, metadata));
99 schemas.emplace_back(std::make_shared<arrow::Schema>(
fields, schemaMetadata));
102 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
104 std::unordered_map<std::string, int> bindings;
105 fillValidRoutes(*helper, spec.
outputs, bindings);
109 O2_SIGNPOST_START(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.
timeslice);
110 for (
auto&
schema : schemas) {
111 std::vector<CCDBFetcherHelper::FetchOp> ops;
112 auto inputBinding = *
schema->metadata()->Get(
"sourceTable");
114 auto outRouteDesc = *
schema->metadata()->Get(
"outputRoute");
115 std::string outBinding = *
schema->metadata()->Get(
"outputBinding");
117 "Fetching CCDB objects for %{public}s's columns with timestamps from %{public}s and putting them in route %{public}s",
118 outBinding.c_str(), inputBinding.c_str(), outRouteDesc.c_str());
121 auto timestampColumn = table->GetColumnByName(
"fTimestamp");
123 "There are %zu bindings available", bindings.size());
124 for (
auto&
binding : bindings) {
129 int outputRouteIndex = bindings.at(outRouteDesc);
130 auto& spec = helper->routes[outputRouteIndex].matcher;
131 std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
132 for (
auto const& _ :
schema->fields()) {
133 builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
136 for (
auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
137 std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
138 auto const* timestamps = chunk->data()->GetValuesSafe<
size_t>(1);
140 for (int64_t ri = 0; ri < chunk->data()->length; ri++) {
142 int64_t timestamp = timestamps[ri];
143 for (
auto& field :
schema->fields()) {
144 auto url = *field->metadata()->Get(
"url");
149 .timestamp = timestamp,
157 "Got %zu responses from server.",
159 if (builders.size() != responses.size()) {
160 LOGP(fatal,
"Not enough responses (expected {}, found {})", builders.size(), responses.size());
163 for (
size_t bi = 0; bi < responses.size(); bi++) {
164 auto& builder = builders[bi];
165 auto& response = responses[bi];
166 char const*
address =
reinterpret_cast<char const*
>(response.id.value);
167 result &= builder->Append(std::string_view(
address, response.size));
170 LOGP(fatal,
"Error adding results from CCDB");
172 O2_SIGNPOST_END(ccdb, sid,
"handlingResponses",
"Done processing responses");
175 arrow::ArrayVector
arrays;
176 for (
auto& builder : builders) {
177 arrays.push_back(*builder->Finish());
181 allocator.
adopt(
Output{concrete.
origin, concrete.description, concrete.subSpec}, outTable);
184 O2_SIGNPOST_END(ccdb, sid,
"fetchFromAnalysisCCDB",
"Fetching CCDB objects");
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_START(log, id, name, format,...)
ServiceRegistryRef services()