129 std::string parentFileReplacement;
130 if (ctx.
options().
isSet(
"aod-parent-base-path-replacement")) {
131 parentFileReplacement = ctx.
options().
get<std::string>(
"aod-parent-base-path-replacement");
133 int parentAccessLevel = 0;
135 parentAccessLevel = ctx.
options().
get<
int>(
"aod-parent-access-level");
149 if (!options.
isSet(
"aod-file-private")) {
150 LOGP(fatal,
"No input file defined!");
151 throw std::runtime_error(
"Processing is stopped!");
154 auto filename = options.
get<std::string>(
"aod-file-private");
156 auto maxRate = options.
get<
float>(
"aod-max-io-rate");
159 auto didir = std::make_shared<DataInputDirector>(
filename, &monitoring, parentAccessLevel, parentFileReplacement);
160 if (options.
isSet(
"aod-reader-json")) {
161 auto jsonFile = options.
get<std::string>(
"aod-reader-json");
162 if (!didir->readJson(jsonFile)) {
163 LOGP(error,
"Check the JSON document! Can not be properly parsed!");
172 bool reportTFN =
false;
173 bool reportTFFileName =
false;
176 std::vector<OutputRoute> requestedTables;
177 std::vector<OutputRoute> routes(spec.
outputs);
178 for (
auto route : routes) {
181 TFNumberHeader =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
185 TFFileNameHeader =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
186 reportTFFileName =
true;
188 requestedTables.emplace_back(route);
192 auto fileCounter = std::make_shared<int>(0);
193 auto numTF = std::make_shared<int>(-1);
206 int ntf = *numTF + 1;
207 static int currentFileCounter = -1;
208 static int filesProcessed = 0;
209 if (currentFileCounter != *fileCounter) {
210 currentFileCounter = *fileCounter;
211 monitoring.send(
Metric{(uint64_t)++filesProcessed,
"files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
216 static size_t totalSizeUncompressed = 0;
217 static size_t totalSizeCompressed = 0;
218 static uint64_t totalDFSent = 0;
221 if (!watchdog->update()) {
222 LOGP(info,
"Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
223 LOGP(info,
"Stopping reader {} after time frame {}.", device.
inputTimesliceId, watchdog->numberTimeFrames - 1);
224 didir->closeInputFiles();
225 monitoring.flushBuffer();
231 int64_t startTime = uv_hrtime();
232 int64_t startSize = totalSizeCompressed;
233 for (
auto& route : requestedTables) {
240 auto dh =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
242 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
246 if (didir->atEnd(fcnt)) {
247 LOGP(info,
"No input files left to read for reader {}!", device.
inputTimesliceId);
248 didir->closeInputFiles();
249 monitoring.flushBuffer();
256 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
257 LOGP(fatal,
"Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
258 throw std::runtime_error(
"Processing is stopped!");
261 LOGP(fatal,
"Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
262 throw std::runtime_error(
"Processing is stopped!");
269 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
270 auto o =
Output(TFNumberHeader);
271 outputs.
make<uint64_t>(
o) = timeFrameNumber;
274 if (reportTFFileName) {
277 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
278 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
279 auto*
f =
dynamic_cast<TFile*
>(rootFS->GetFile());
280 std::string currentFilename(
f->GetFile()->GetName());
281 if (strcmp(
f->GetEndpointUrl()->GetProtocol(),
"file") == 0 &&
f->GetEndpointUrl()->GetFile()[0] !=
'/') {
283 static std::string pwd = gSystem->pwd() + std::string(
"/");
284 currentFilename = pwd + std::string(
f->GetName());
286 outputs.
make<std::string>(
o2) = currentFilename;
291 int64_t stopSize = totalSizeCompressed;
292 int64_t bytesDelta = stopSize - startSize;
293 int64_t stopTime = uv_hrtime();
294 float currentDelta = float(stopTime - startTime) / 1000000000;
295 if (ceil(maxRate) > 0.) {
296 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
298 if (extraTime > 0.) {
299 LOGP(info,
"Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
300 uv_sleep(extraTime * 1000);
304 monitoring.send(
Metric{(uint64_t)totalDFSent,
"df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
305 monitoring.send(
Metric{(uint64_t)totalSizeUncompressed / 1000,
"aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
306 monitoring.send(
Metric{(uint64_t)totalSizeCompressed / 1000,
"aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
316 auto& firstRoute = requestedTables.front();
318 auto dh =
header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
319 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
322 if (fileAndFolder.path().empty()) {
325 if (didir->atEnd(fcnt)) {
326 LOGP(info,
"No input files left to read for reader {}!", device.
inputTimesliceId);
327 didir->closeInputFiles();
328 monitoring.flushBuffer();