Project
Loading...
Searching...
No Matches
AODJAlienReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <charconv>
14#include <memory>
15#include <ranges>
16#include <vector>
30#include "DataInputDirector.h"
33#include "Framework/Logger.h"
34
35#if __has_include(<TJAlienFile.h>)
36#include <TJAlienFile.h>
37#endif
38#include <TGrid.h>
39#include <TFile.h>
40#include <TTreeCache.h>
41#include <TSystem.h>
42
43#include <arrow/ipc/reader.h>
44#include <arrow/ipc/writer.h>
45#include <arrow/io/interfaces.h>
46#include <arrow/table.h>
47#include <arrow/util/key_value_metadata.h>
48#include <arrow/dataset/dataset.h>
49#include <arrow/dataset/file_base.h>
50
51using namespace o2;
52using namespace o2::aod;
53
56 uint64_t startTime;
57 uint64_t lastTime;
58 double runTime;
59 uint64_t runTimeLimit;
60
61 RuntimeWatchdog(Long64_t limit)
62 {
64 startTime = uv_hrtime();
66 runTime = 0.;
67 runTimeLimit = limit;
68 }
69
70 bool update()
71 {
73 if (runTimeLimit <= 0) {
74 return true;
75 }
76
77 auto nowTime = uv_hrtime();
78
79 // time spent to process the time frame
80 double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
81 runTime += time_spent;
82 lastTime = nowTime;
83
84 return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
85 }
86
87 void printOut()
88 {
89 LOGP(info, "RuntimeWatchdog");
90 LOGP(info, " run time limit: {}", runTimeLimit);
91 LOGP(info, " number of time frames: {}", numberTimeFrames);
92 LOGP(info, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
93 LOGP(info, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
94 }
95};
96
97using o2::monitoring::Metric;
98using o2::monitoring::Monitoring;
99using o2::monitoring::tags::Key;
100using o2::monitoring::tags::Value;
101
103{
105{
106 // aod-parent-base-path-replacement is now a workflow option, so it needs to be
107 // retrieved from the ConfigContext. This is because we do not allow workflow options
108 // to change over start-stop-start because they can affect the topology generation.
109 std::string parentFileReplacement;
110 if (ctx.options().isSet("aod-parent-base-path-replacement")) {
111 parentFileReplacement = ctx.options().get<std::string>("aod-parent-base-path-replacement");
112 }
113 int parentAccessLevel = 0;
114 if (ctx.options().isSet("aod-parent-access-level")) {
115 parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
116 }
117 std::vector<std::pair<std::string, int>> originLevelMapping;
118 if (ctx.options().isSet("aod-origin-level-mapping")) {
119 auto originLevelMappingStr = ctx.options().get<std::string>("aod-origin-level-mapping");
120 for (auto pairRange : originLevelMappingStr | std::views::split(',')) {
121 std::string_view pair{pairRange.begin(), pairRange.end()};
122 auto colonPos = pair.find(':');
123 if (colonPos == std::string_view::npos) {
124 LOGP(fatal, "Badly formatted aod-origin-level-mapping entry: \"{}\"", pair);
125 continue;
126 }
127 std::string key(pair.substr(0, colonPos));
128 std::string_view valueStr = pair.substr(colonPos + 1);
129 int value{};
130 auto [ptr, ec] = std::from_chars(valueStr.data(), valueStr.data() + valueStr.size(), value);
131 if (ec == std::errc{}) {
132 originLevelMapping.emplace_back(std::move(key), value);
133 } else {
134 LOGP(fatal, "Unable to parse level in aod-origin-level-mapping entry: \"{}\"", pair);
135 }
136 }
137 }
138 auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel, originLevelMapping](ConfigParamRegistry const& options,
139 DeviceSpec const& spec,
140 Monitoring& monitoring,
141 DataProcessingStats& stats) {
142 // FIXME: not actually needed, since data processing stats can specify that we should
143 // send the initial value.
144 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
145 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED), DataProcessingStats::Op::Set, 0});
146 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
147 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
148 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
149 stats.updateStats({static_cast<short>(ProcessingStatsId::CONSUMED_TIMEFRAMES), DataProcessingStats::Op::Set, 0});
150
151 if (!options.isSet("aod-file-private")) {
152 LOGP(fatal, "No input file defined!");
153 throw std::runtime_error("Processing is stopped!");
154 }
155
156 auto filename = options.get<std::string>("aod-file-private");
157
158 auto maxRate = options.get<float>("aod-max-io-rate");
159
160 // create a DataInputDirector
161 auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement, originLevelMapping});
162 if (options.isSet("aod-reader-json")) {
163 auto jsonFile = options.get<std::string>("aod-reader-json");
164 if (!didir->readJson(jsonFile)) {
165 LOGP(error, "Check the JSON document! Can not be properly parsed!");
166 }
167 }
168
169 // get the run time watchdog
170 auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
171
172 // selected the TFN input and
173 // create list of requested tables
174 bool reportTFN = false;
175 bool reportTFFileName = false;
176 header::DataHeader TFNumberHeader;
177 header::DataHeader TFFileNameHeader;
178 std::vector<OutputRoute> requestedTables;
179 std::vector<OutputRoute> routes(spec.outputs);
180 for (auto route : routes) {
181 if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
182 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
183 TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
184 reportTFN = true;
185 } else if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFF"))) {
186 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
187 TFFileNameHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
188 reportTFFileName = true;
189 } else {
190 requestedTables.emplace_back(route);
191 }
192 }
193
194 auto fileCounter = std::make_shared<int>(0);
195 auto numTF = std::make_shared<int>(-1);
196 return adaptStateless([TFNumberHeader,
197 TFFileNameHeader,
198 requestedTables,
199 fileCounter,
200 numTF,
201 watchdog,
202 maxRate,
203 didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device, DataProcessingStats& dpstats) {
204 // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
205 // the TF to read is numTF
206 assert(device.inputTimesliceId < device.maxInputTimeslices);
207 int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
208 int ntf = *numTF + 1;
209 static int currentFileCounter = -1;
210 static int filesProcessed = 0;
211 if (currentFileCounter != *fileCounter) {
212 currentFileCounter = *fileCounter;
213 monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
214 }
215
216 // loop over requested tables
217 bool first = true;
218 static size_t totalSizeUncompressed = 0;
219 static size_t totalSizeCompressed = 0;
220 static uint64_t totalDFSent = 0;
221
222 // check if RuntimeLimit is reached
223 if (!watchdog->update()) {
224 LOGP(info, "Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
225 LOGP(info, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
226 didir->closeInputFiles();
227 monitoring.flushBuffer();
228 control.endOfStream();
230 return;
231 }
232
233 int64_t startTime = uv_hrtime();
234 int64_t startSize = totalSizeCompressed;
235 for (auto& route : requestedTables) {
236 if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
237 continue;
238 }
239
240 // create header
241 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
242 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
243
244 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
245 if (first) {
246 // check if there is a next file to read
247 fcnt += device.maxInputTimeslices;
248 if (didir->atEnd(fcnt)) {
249 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
250 didir->closeInputFiles();
251 monitoring.flushBuffer();
252 control.endOfStream();
254 return;
255 }
256 // get first folder of next file
257 ntf = 0;
258 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
259 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
260 throw std::runtime_error("Processing is stopped!");
261 }
262 } else {
263 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
264 throw std::runtime_error("Processing is stopped!");
265 }
266 }
267
268 if (first) {
269 if (reportTFN) {
270 // TF number
271 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
272 auto o = Output(TFNumberHeader);
273 outputs.make<uint64_t>(o) = timeFrameNumber;
274 }
275
276 if (reportTFFileName) {
277 // Origin file name for derived output map
278 auto o2 = Output(TFFileNameHeader);
279 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
280 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
281 auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
282 std::string currentFilename(f->GetFile()->GetName());
283 if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
284 // This is not an absolute local path. Make it absolute.
285 static std::string pwd = gSystem->pwd() + std::string("/");
286 currentFilename = pwd + std::string(f->GetName());
287 }
288 outputs.make<std::string>(o2) = currentFilename;
289 }
290 }
291 first = false;
292 }
293 int64_t stopSize = totalSizeCompressed;
294 int64_t bytesDelta = stopSize - startSize;
295 int64_t stopTime = uv_hrtime();
296 float currentDelta = float(stopTime - startTime) / 1000000000; // in s
297 if (ceil(maxRate) > 0.) {
298 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
299 // We only sleep if we read faster than the max-read-rate.
300 if (extraTime > 0.) {
301 LOGP(info, "Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
302 uv_sleep(extraTime * 1000); // in milliseconds
303 }
304 }
305 totalDFSent++;
306
307 // Use the new API for sending TIMESLICE_NUMBER_STARTED
309 dpstats.processCommandQueue();
310 monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
311 monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
312 monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
313
314 // save file number and time frame
315 *fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
316 *numTF = ntf;
317
318 // Check if the next timeframe is available or
319 // if there are more files to be processed. If not, simply exit.
320 fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
321 ntf = *numTF + 1;
322 auto& firstRoute = requestedTables.front();
323 auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
324 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
325 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
326
327 // In case the filesource is empty, move to the next one.
328 if (fileAndFolder.path().empty()) {
329 fcnt += 1;
330 ntf = 0;
331 if (didir->atEnd(fcnt)) {
332 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
333 didir->closeInputFiles();
334 monitoring.flushBuffer();
335 control.endOfStream();
337 return;
338 }
339 }
340 });
341 })};
342
343 return callback;
344}
345
346} // namespace o2::framework::readers
std::vector< OutputRoute > routes
o2::monitoring::Metric Metric
TBranch * ptr
o2::monitoring::Monitoring Monitoring
StringRef key
ConfigParamRegistry & options() const
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
GLdouble f
Definition glcorearb.h:310
GLsizei const GLfloat * value
Definition glcorearb.h:819
@ Me
Only quit this data processor.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string filename()
RuntimeWatchdog(Long64_t limit)
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
size_t maxInputTimeslices
The maximum number of time pipelining for this device.
Definition DeviceSpec.h:70
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static AlgorithmSpec rootFileReaderCallback(ConfigContext const &context)
the main header struct
Definition DataHeader.h:620