109 std::string parentFileReplacement;
110 if (ctx.options().isSet(
"aod-parent-base-path-replacement")) {
111 parentFileReplacement = ctx.options().get<std::string>(
"aod-parent-base-path-replacement");
113 int parentAccessLevel = 0;
114 if (ctx.options().isSet(
"aod-parent-access-level")) {
115 parentAccessLevel = ctx.options().get<
int>(
"aod-parent-access-level");
117 std::vector<std::pair<std::string, int>> originLevelMapping;
118 if (ctx.options().isSet(
"aod-origin-level-mapping")) {
119 auto originLevelMappingStr = ctx.options().get<std::string>(
"aod-origin-level-mapping");
120 for (
auto pairRange : originLevelMappingStr | std::views::split(
',')) {
121 std::string_view pair{pairRange.begin(), pairRange.end()};
122 auto colonPos = pair.find(
':');
123 if (colonPos == std::string_view::npos) {
124 LOGP(fatal,
"Badly formatted aod-origin-level-mapping entry: \"{}\"", pair);
127 std::string
key(pair.substr(0, colonPos));
128 std::string_view valueStr = pair.substr(colonPos + 1);
130 auto [
ptr, ec] = std::from_chars(valueStr.data(), valueStr.data() + valueStr.size(),
value);
131 if (ec == std::errc{}) {
132 originLevelMapping.emplace_back(std::move(
key),
value);
134 LOGP(fatal,
"Unable to parse level in aod-origin-level-mapping entry: \"{}\"", pair);
151 if (!options.
isSet(
"aod-file-private")) {
152 LOGP(fatal,
"No input file defined!");
153 throw std::runtime_error(
"Processing is stopped!");
156 auto filename = options.
get<std::string>(
"aod-file-private");
158 auto maxRate = options.
get<
float>(
"aod-max-io-rate");
161 auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{
filename},
DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement, originLevelMapping});
162 if (options.
isSet(
"aod-reader-json")) {
163 auto jsonFile = options.
get<std::string>(
"aod-reader-json");
164 if (!didir->readJson(jsonFile)) {
165 LOGP(error,
"Check the JSON document! Can not be properly parsed!");
174 bool reportTFN =
false;
175 bool reportTFFileName =
false;
178 std::vector<OutputRoute> requestedTables;
180 for (
auto route :
routes) {
183 TFNumberHeader =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
187 TFFileNameHeader =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
188 reportTFFileName =
true;
190 requestedTables.emplace_back(route);
193 int level = originLevelMapping.empty() ? -1 : 0;
194 auto fileCounter = std::make_shared<int>(0);
195 auto numTF = std::make_shared<int>(-1);
208 int ntf = *numTF + 1;
209 static int currentFileCounter = -1;
210 static int filesProcessed = 0;
211 if (currentFileCounter != *fileCounter) {
212 currentFileCounter = *fileCounter;
213 monitoring.send(
Metric{(uint64_t)++filesProcessed,
"files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
218 static size_t totalSizeUncompressed = 0;
219 static size_t totalSizeCompressed = 0;
220 static uint64_t totalDFSent = 0;
223 if (!watchdog->update()) {
224 LOGP(info,
"Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
225 LOGP(info,
"Stopping reader {} after time frame {}.", device.
inputTimesliceId, watchdog->numberTimeFrames - 1);
226 didir->closeInputFiles();
227 monitoring.flushBuffer();
233 int64_t startTime = uv_hrtime();
234 int64_t startSize = totalSizeCompressed;
235 for (
auto& route : requestedTables) {
242 auto dh =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
243 bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](
ConfigParamSpec const& p) { return p.name.starts_with(
"aod-origin-replaced"); });
245 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) {
249 if (didir->atEnd(fcnt)) {
250 LOGP(info,
"No input files left to read for reader {}!", device.
inputTimesliceId);
251 didir->closeInputFiles();
252 monitoring.flushBuffer();
259 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) {
260 LOGP(fatal,
"Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
261 throw std::runtime_error(
"Processing is stopped!");
264 LOGP(fatal,
"Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
265 throw std::runtime_error(
"Processing is stopped!");
272 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
273 auto o =
Output(TFNumberHeader);
274 outputs.
make<uint64_t>(o) = timeFrameNumber;
277 if (reportTFFileName) {
280 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
281 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
282 auto*
f =
dynamic_cast<TFile*
>(rootFS->GetFile());
283 std::string currentFilename(
f->GetFile()->GetName());
284 if (strcmp(
f->GetEndpointUrl()->GetProtocol(),
"file") == 0 &&
f->GetEndpointUrl()->GetFile()[0] !=
'/') {
286 static std::string pwd = gSystem->pwd() + std::string(
"/");
287 currentFilename = pwd + std::string(
f->GetName());
289 outputs.
make<std::string>(
o2) = currentFilename;
294 int64_t stopSize = totalSizeCompressed;
295 int64_t bytesDelta = stopSize - startSize;
296 int64_t stopTime = uv_hrtime();
297 float currentDelta = float(stopTime - startTime) / 1000000000;
298 if (ceil(maxRate) > 0.) {
299 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
301 if (extraTime > 0.) {
302 LOGP(info,
"Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
303 uv_sleep(extraTime * 1000);
311 monitoring.send(
Metric{(uint64_t)totalDFSent,
"df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
312 monitoring.send(
Metric{(uint64_t)totalSizeUncompressed / 1000,
"aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
313 monitoring.send(
Metric{(uint64_t)totalSizeCompressed / 1000,
"aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
323 auto firstRoute = std::ranges::find_if(requestedTables, [&didir,
level](
auto const& route) {
325 return didir->getLevelForOrigin(concrete.origin) ==
level;
328 auto dh =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
329 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
332 if (fileAndFolder.filesystem() ==
nullptr) {
335 if (didir->atEnd(fcnt)) {
336 LOGP(info,
"No input files left to read for reader {}!", device.
inputTimesliceId);
337 didir->closeInputFiles();
338 monitoring.flushBuffer();