Project
Loading...
Searching...
No Matches
rawfileSplit.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
20#include "Framework/Logger.h"
21#include <TStopwatch.h>
22#include <boost/program_options.hpp>
23#include <iostream>
24#include <string>
25#include <vector>
26#include <filesystem>
27
28namespace bpo = boost::program_options;
29
30using namespace o2::raw;
31
32int main(int argc, char* argv[])
33{
34 RawFileReader reader;
35 std::vector<std::string> fnames;
36 std::string config, configKeyValues;
37 bpo::variables_map vm;
38 bpo::options_description descOpt("Options");
39 auto desc_add_option = descOpt.add_options();
40 desc_add_option("help,h", "print this help message.");
41 desc_add_option("input-conf,c", bpo::value(&config)->default_value(""), "read input from configuration file");
42 desc_add_option("max-tf,m", bpo::value<uint32_t>()->default_value(0xffffffff), " ID to read (counts from 0)");
43 desc_add_option("verbosity,v", bpo::value<int>()->default_value(reader.getVerbosity()), "1: long report, 2 or 3: print or dump all RDH");
44 desc_add_option("spsize,s", bpo::value<int>()->default_value(reader.getNominalSPageSize()), "nominal super-page size in bytes");
45 desc_add_option("buffer-size,b", bpo::value<size_t>()->default_value(reader.getNominalSPageSize()), "buffer size for files preprocessing");
46 desc_add_option("detect-tf0", "autodetect HBFUtils start Orbit/BC from 1st TF seen");
47 desc_add_option("calculate-tf-start", "calculate TF start instead of using TType");
48 desc_add_option("rorc", "impose RORC as default detector mode");
49 desc_add_option("tfs-per-chunk,n", bpo::value<uint32_t>()->default_value(0xffffffff), " number of output TFs per chunk");
50 desc_add_option("output-dir-prefix,o", bpo::value<std::string>()->default_value("./chunk"), "output directory prefix for raw data chunk (chunk ID will be added)");
51 desc_add_option("file-for,f", bpo::value<std::string>()->default_value("all"), "single file per: all,cru,link");
52
53 desc_add_option("configKeyValues", bpo::value(&configKeyValues)->default_value(""), "semicolon separated key=value strings");
54 for (int i = 0; i < RawFileReader::NErrorsDefined; i++) {
55 auto ei = RawFileReader::ErrTypes(i);
56 desc_add_option(RawFileReader::nochk_opt(ei).c_str(), RawFileReader::nochk_expl(ei).c_str());
57 }
58
59 bpo::options_description hiddenOpt("hidden");
60 hiddenOpt.add_options()("files", bpo::value(&fnames)->composing(), "");
61
62 bpo::options_description fullOpt("cmd");
63 fullOpt.add(descOpt).add(hiddenOpt);
64
65 bpo::positional_options_description posOpt;
66 posOpt.add("files", -1);
67
68 auto printHelp = [&](std::ostream& stream) {
69 stream << "Usage: " << argv[0] << " [options] file0 [... fileN]" << std::endl;
70 stream << descOpt << std::endl;
71 stream << " (input files are optional if config file was provided)" << std::endl;
72 };
73
74 try {
75 bpo::store(bpo::command_line_parser(argc, argv)
76 .options(fullOpt)
77 .positional(posOpt)
78 .allow_unregistered()
79 .run(),
80 vm);
81 bpo::notify(vm);
82 if (argc == 1 || vm.count("help") || (fnames.empty() && config.empty())) {
83 printHelp(std::cout);
84 return 0;
85 }
87 } catch (const bpo::error& e) {
88 std::cerr << e.what() << "\n\n";
89 std::cerr << "Error parsing command line arguments\n";
90 printHelp(std::cerr);
91 return -1;
92 }
93
95 LOG(info) << "RawDataHeader v" << int(rdh.version) << " is assumed";
96
97 RawFileReader::ReadoutCardType rocard = vm.count("rorc") ? RawFileReader::ReadoutCardType::RORC : RawFileReader::ReadoutCardType::CRU;
98
99 reader.setVerbosity(vm["verbosity"].as<int>());
100 reader.setNominalSPageSize(vm["spsize"].as<int>());
101 reader.setMaxTFToRead(vm["max-tf"].as<uint32_t>());
102 reader.setBufferSize(vm["buffer-size"].as<size_t>());
103 reader.setDefaultReadoutCardType(rocard);
104 reader.setTFAutodetect(vm.count("detect-tf0") ? RawFileReader::FirstTFDetection::Pending : RawFileReader::FirstTFDetection::Disabled);
105 reader.setPreferCalculatedTFStart(vm.count("calculate-tf-start"));
106
107 std::string_view fileFor = vm["file-for"].as<std::string>();
108
109 uint32_t errmap = 0;
110 for (int i = RawFileReader::NErrorsDefined; i--;) {
111 auto ei = RawFileReader::ErrTypes(i);
113 errmap |= 0x1 << i;
114 }
115 if (vm.count(RawFileReader::nochk_opt(ei).c_str())) { // toggle
116 errmap ^= 0x1 << i;
117 }
118 LOG(info) << ((errmap & (0x1 << i)) ? "apply " : "ignore") << " check for " << RawFileReader::ErrNames[i].data();
119 }
120
121 if (!config.empty()) {
122 auto inp = RawFileReader::parseInput(config);
123 reader.loadFromInputsMap(inp);
124 }
125
126 for (int i = 0; i < fnames.size(); i++) {
127 reader.addFile(fnames[i]);
128 }
129
130 TStopwatch sw;
131 sw.Start();
132
133 reader.setCheckErrors(errmap);
134 reader.init();
135
136 sw.Print();
137 int maxTFPerChunk = vm["tfs-per-chunk"].as<uint32_t>();
138 std::string outDirPrefix = vm["output-dir-prefix"].as<std::string>(), outDir = "";
139 int ntf = reader.getNTimeFrames();
140 int nlinks = reader.getNLinks();
141 std::vector<RawFileReader::PartStat> partsSP;
142 std::vector<char> buffer;
143 std::unique_ptr<RawFileWriter> writer;
144 int chunkID = -1;
145
146 for (int itf = 0; itf < ntf; itf++) {
147 reader.setNextTFToRead(itf);
148 bool reinitWriter = false;
149 if ((itf % maxTFPerChunk) == 0) {
150 reinitWriter = true;
151 chunkID++;
152 }
153 for (int il = 0; il < nlinks; il++) {
154 auto& link = reader.getLink(il);
155 if (!link.rewindToTF(itf)) {
156 continue; // this link has no data for wanted TF
157 }
158 int nParts = link.getNextTFSuperPagesStat(partsSP);
159 for (int ip = 0; ip < nParts; ip++) {
160 buffer.resize(partsSP[ip].size);
161 auto bread = link.readNextSuperPage(buffer.data(), &partsSP[ip]);
162 if (bread != partsSP[ip].size) {
163 LOG(error) << "Link " << il << " read " << bread << " bytes instead of " << partsSP[ip].size << " expected in TF=" << itf << " part=" << ip;
164 }
165
166 if (reinitWriter) {
167 if (writer) { // generate config for previous chunk
168 writer->writeConfFile(writer->getOrigin().str, "RAWDATA", o2::utils::Str::concat_string(outDir, '/', writer->getOrigin().str, "raw.cfg"));
169 }
170 outDir = o2::utils::Str::concat_string(outDirPrefix, "_", std::to_string(chunkID));
171 // if needed, create output directory
172 if (!std::filesystem::exists(outDir)) {
173 if (!std::filesystem::create_directories(outDir)) {
174 LOG(fatal) << "could not create output directory " << outDir;
175 } else {
176 LOG(info) << "created output directory " << outDir;
177 }
178 }
179 writer = std::make_unique<RawFileWriter>(link.origin, link.cruDetector);
180 writer->useRDHVersion(RDHUtils::getVersion(link.rdhl));
181 reinitWriter = false;
182 }
183 if (!writer->isLinkRegistered(RDHUtils::getSubSpec(RDHUtils::getCRUID(link.rdhl), RDHUtils::getLinkID(link.rdhl), RDHUtils::getEndPointID(link.rdhl), RDHUtils::getFEEID(link.rdhl)))) { // register the output link
184 std::string outFileName;
185
186 if (fileFor == "all") { // single file for all links
187 outFileName = o2::utils::Str::concat_string(outDir, "/", fileFor, ".raw");
188 } else if (fileFor == "cru") {
189 outFileName = o2::utils::Str::concat_string(outDir, "/", fileFor, "_", std::to_string(RDHUtils::getCRUID(link.rdhl)), ".raw");
190 } else if (fileFor == "link") {
191 outFileName = o2::utils::Str::concat_string(outDir, "/", fileFor,
192 "_", std::to_string(RDHUtils::getLinkID(link.rdhl)),
193 "_cru", std::to_string(RDHUtils::getCRUID(link.rdhl)),
194 "_ep", std::to_string(RDHUtils::getEndPointID(link.rdhl)),
195 "_feeid", std::to_string(RDHUtils::getFEEID(link.rdhl)), ".raw");
196 } else {
197 throw std::runtime_error("invalid option provided for file grouping");
198 }
199
200 writer->registerLink(link.rdhl, outFileName);
201 }
202
203 auto& linkW = writer->getLinkWithSubSpec(link.rdhl);
204 auto& outF = writer->getOutputFileForLink(linkW);
205 outF.write(buffer.data(), bread);
206 }
207 }
208 }
209 if (writer) { // generate config for previous chunk
210 writer->writeConfFile(writer->getOrigin().str, "RAWDATA", o2::utils::Str::concat_string(outDir, '/', writer->getOrigin().str, "raw.cfg"));
211 }
212 writer.reset();
213
214 return 0;
215}
int32_t i
Utility class to write detectors data to (multiple) raw data file(s) respecting CRU format.
Reader for (multiple) raw data files.
static void updateFromString(std::string const &)
void setVerbosity(int v=1)
static InputsMap parseInput(const std::string &confUri, const std::string &onlyDet={}, bool verbose=false)
void setNextTFToRead(uint32_t tf)
static std::string nochk_expl(ErrTypes e)
void setTFAutodetect(FirstTFDetection v)
void setBufferSize(size_t s)
void setNominalSPageSize(int n=0x1<< 20)
void loadFromInputsMap(const InputsMap &inp)
int getNominalSPageSize() const
void setMaxTFToRead(uint32_t n)
static constexpr bool ErrCheckDefaults[]
uint32_t getNTimeFrames() const
static std::string nochk_opt(ErrTypes e)
void setCheckErrors(uint32_t m=0xffffffff)
const LinkData & getLink(int i) const
void setDefaultReadoutCardType(ReadoutCardType t=CRU)
static constexpr std::string_view ErrNames[]
bool addFile(const std::string &sname, o2::header::DataOrigin origin, o2::header::DataDescription desc, ReadoutCardType t=CRU)
void setPreferCalculatedTFStart(bool v)
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
GLuint GLuint stream
Definition glcorearb.h:1806
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void printHelp(bpo::variables_map const &varmap, bpo::options_description const &executorOptions, std::vector< DataProcessorSpec > const &physicalWorkflow, std::vector< ConfigParamSpec > const &currentWorkflowOptions)
static LinkSubSpec_t getSubSpec(uint16_t cru, uint8_t link, uint8_t endpoint, uint16_t feeId, o2::header::DAQID::ID srcid=o2::header::DAQID::INVALID)
Definition RDHUtils.h:685
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:58
size_t getNextTFSuperPagesStat(std::vector< PartStat > &parts) const
static std::string concat_string(Ts const &... ts)
#define main
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
TStopwatch sw