23int main(
int argc,
char** argv)
26 char* input_file =
nullptr;
27 char* output_file =
nullptr;
30 static struct option long_options[] = {
31 {
"input", required_argument,
nullptr,
'i'},
32 {
"output", required_argument,
nullptr,
'o'},
33 {
nullptr, 0,
nullptr, 0}
40 while ((
c = getopt_long(argc, argv,
"i:o:", long_options, &option_index)) != -1) {
50 printf(
"Unknown option. Use --input <file> and --output <file>\n");
58 if (input_file && output_file) {
59 printf(
"Input file: %s\n", input_file);
60 printf(
"Output file: %s\n", output_file);
62 fprintf(stderr,
"Usage: %s --input <file> --output <file>\n", argv[0]);
67 std::vector<char const*> capabilitiesSpecs = {
68 "O2Framework:RNTupleObjectReadingCapability",
69 "O2Framework:TTreeObjectReadingCapability",
74 std::vector<LoadablePlugin> plugins;
75 for (
auto spec : capabilitiesSpecs) {
77 for (
auto& extra : morePlugins) {
78 plugins.push_back(extra);
82 auto in = TFile::Open(input_file,
"READ");
83 auto out = TFile::Open(output_file,
"RECREATE");
85 auto fs = std::make_shared<o2::framework::TFileFileSystem>(in, 50 * 1024 * 1024, factory);
86 auto outFs = std::make_shared<o2::framework::TFileFileSystem>(out, 0, factory);
88 o2::framework::PluginManager::loadFromPlugin<o2::framework::RootObjectReadingCapability, o2::framework::RootObjectReadingCapabilityPlugin>(plugins, factory.
capabilities);
91 auto rNtupleFormat = factory.
capabilities[0].factory().format();
94 for (
TObject* dk : *in->GetListOfKeys()) {
95 if (dk->GetName() == std::string(
"metaData")) {
96 TMap*
m =
dynamic_cast<TMap*
>(in->Get(dk->GetName()));
98 auto* copy =
m->Clone(
"metaData");
99 out->WriteTObject(copy);
102 if (dk->GetName() == std::string(
"parentFiles")) {
103 TMap*
m =
dynamic_cast<TMap*
>(in->Get(dk->GetName()));
105 auto* copy =
m->Clone(
"parentFiles");
106 out->WriteTObject(copy);
109 auto* d = (TDirectory*)in->Get(dk->GetName());
110 std::cout <<
"Processing: " << dk->GetName() << std::endl;
113 auto destination = outFs->OpenOutputStream(
"/", {});
114 if (!destination.ok()) {
115 std::cerr <<
"Could not open destination folder " << output_file << std::endl;
119 for (
TObject* tk : *d->GetListOfKeys()) {
120 auto sourceUrl = fmt::format(
"{}/{}", dk->GetName(), tk->GetName());
123 auto destUrl = fmt::format(
"/{}-{}", dk->GetName(), tk->GetName());
124 arrow::dataset::FileSource
source(sourceUrl, fs);
126 std::cout <<
"Source " <<
source.path() <<
" is not supported" << std::endl;
129 std::cout <<
" Processing tree: " << tk->GetName() << std::endl;
131 if (!schemaOpt.ok()) {
132 std::cout <<
"Could not inspect source " <<
source.path() << std::endl;
134 auto schema = *schemaOpt;
135 auto fragment =
format->MakeFragment(
source, {}, schema);
136 if (!fragment.ok()) {
137 std::cout <<
"Could not make fragment from " <<
source.path() <<
"with schema:" << schema->ToString() << std::endl;
140 auto options = std::make_shared<arrow::dataset::ScanOptions>();
141 options->dataset_schema = schema;
142 auto scanner =
format->ScanBatchesAsync(options, *fragment);
144 std::cout <<
"Scanner not ok" << std::endl;
147 auto batches = (*scanner)();
148 auto result = batches.result();
150 std::cout <<
"Could not get batches." << std::endl;
153 std::cout <<
" Found a table with " << (*result)->columns().size() <<
" columns " << (*result)->num_rows() <<
" rows." << std::endl;
155 if ((*result)->num_rows() == 0) {
156 std::cout <<
"Empty table, skipping for now" << std::endl;
159 arrow::fs::FileLocator locator{outFs, destUrl};
160 std::cout << schema->ToString() << std::endl;
161 auto writer = rNtupleFormat->MakeWriter(*destination, schema, {}, locator);
162 auto success = writer->get()->Write(*
result);
164 std::cout <<
"Error while writing" << std::endl;
169 auto rootDestination = std::dynamic_pointer_cast<o2::framework::TDirectoryFileOutputStream>(*destination);