22int main(
int argc,
char** argv)
25 char* input_file =
nullptr;
26 char* output_file =
nullptr;
29 static struct option long_options[] = {
30 {
"input", required_argument,
nullptr,
'i'},
31 {
"output", required_argument,
nullptr,
'o'},
32 {
nullptr, 0,
nullptr, 0}
39 while ((
c = getopt_long(argc, argv,
"i:o:", long_options, &option_index)) != -1) {
49 printf(
"Unknown option. Use --input <file> and --output <file>\n");
57 if (input_file && output_file) {
58 printf(
"Input file: %s\n", input_file);
59 printf(
"Output file: %s\n", output_file);
61 fprintf(stderr,
"Usage: %s --input <file> --output <file>\n", argv[0]);
66 std::vector<char const*> capabilitiesSpecs = {
67 "O2Framework:RNTupleObjectReadingCapability",
68 "O2Framework:TTreeObjectReadingCapability",
73 std::vector<LoadablePlugin> plugins;
74 for (
auto spec : capabilitiesSpecs) {
76 for (
auto& extra : morePlugins) {
77 plugins.push_back(extra);
81 auto in = TFile::Open(input_file,
"READ");
82 auto out = TFile::Open(output_file,
"RECREATE");
84 auto fs = std::make_shared<o2::framework::TFileFileSystem>(in, 50 * 1024 * 1024, factory);
85 auto outFs = std::make_shared<o2::framework::TFileFileSystem>(out, 0, factory);
87 o2::framework::PluginManager::loadFromPlugin<o2::framework::RootObjectReadingCapability, o2::framework::RootObjectReadingCapabilityPlugin>(plugins, factory.
capabilities);
90 auto rNtupleFormat = factory.
capabilities[0].factory().format();
93 for (
TObject* dk : *in->GetListOfKeys()) {
94 if (dk->GetName() == std::string(
"metaData")) {
95 TMap*
m =
dynamic_cast<TMap*
>(in->Get(dk->GetName()));
97 auto* copy =
m->Clone(
"metaData");
98 out->WriteTObject(copy);
101 if (dk->GetName() == std::string(
"parentFiles")) {
102 TMap*
m =
dynamic_cast<TMap*
>(in->Get(dk->GetName()));
104 auto* copy =
m->Clone(
"parentFiles");
105 out->WriteTObject(copy);
108 auto* d = (TDirectory*)in->Get(dk->GetName());
109 std::cout <<
"Processing: " << dk->GetName() << std::endl;
112 auto destination = outFs->OpenOutputStream(
"/", {});
113 if (!destination.ok()) {
114 std::cerr <<
"Could not open destination folder " << output_file << std::endl;
118 for (
TObject* tk : *d->GetListOfKeys()) {
119 auto sourceUrl = fmt::format(
"{}/{}", dk->GetName(), tk->GetName());
122 auto destUrl = fmt::format(
"/{}-{}", dk->GetName(), tk->GetName());
123 arrow::dataset::FileSource
source(sourceUrl, fs);
125 std::cout <<
"Source " <<
source.path() <<
" is not supported" << std::endl;
128 std::cout <<
" Processing tree: " << tk->GetName() << std::endl;
130 if (!schemaOpt.ok()) {
131 std::cout <<
"Could not inspect source " <<
source.path() << std::endl;
133 auto schema = *schemaOpt;
134 auto fragment =
format->MakeFragment(
source, {}, schema);
135 if (!fragment.ok()) {
136 std::cout <<
"Could not make fragment from " <<
source.path() <<
"with schema:" << schema->ToString() << std::endl;
139 auto options = std::make_shared<arrow::dataset::ScanOptions>();
140 options->dataset_schema = schema;
141 auto scanner =
format->ScanBatchesAsync(options, *fragment);
143 std::cout <<
"Scanner not ok" << std::endl;
146 auto batches = (*scanner)();
147 auto result = batches.result();
149 std::cout <<
"Could not get batches." << std::endl;
152 std::cout <<
" Found a table with " << (*result)->columns().size() <<
" columns " << (*result)->num_rows() <<
" rows." << std::endl;
154 if ((*result)->num_rows() == 0) {
155 std::cout <<
"Empty table, skipping for now" << std::endl;
158 arrow::fs::FileLocator locator{outFs, destUrl};
159 std::cout << schema->ToString() << std::endl;
160 auto writer = rNtupleFormat->MakeWriter(*destination, schema, {}, locator);
161 auto success = writer->get()->Write(*
result);
163 std::cout <<
"Error while writing" << std::endl;
168 auto rootDestination = std::dynamic_pointer_cast<o2::framework::TDirectoryFileOutputStream>(*destination);