Project
Loading...
Searching...
No Matches
AODJAlienReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <memory>
27#include "DataInputDirector.h"
30#include "Framework/Logger.h"
31
32#if __has_include(<TJAlienFile.h>)
33#include <TJAlienFile.h>
34#endif
35#include <TGrid.h>
36#include <TFile.h>
37#include <TTreeCache.h>
38#include <TSystem.h>
39
40#include <arrow/ipc/reader.h>
41#include <arrow/ipc/writer.h>
42#include <arrow/io/interfaces.h>
43#include <arrow/table.h>
44#include <arrow/util/key_value_metadata.h>
45#include <arrow/dataset/dataset.h>
46#include <arrow/dataset/file_base.h>
47
48using namespace o2;
49using namespace o2::aod;
50
53 uint64_t startTime;
54 uint64_t lastTime;
55 double runTime;
56 uint64_t runTimeLimit;
57
58 RuntimeWatchdog(Long64_t limit)
59 {
61 startTime = uv_hrtime();
63 runTime = 0.;
64 runTimeLimit = limit;
65 }
66
67 bool update()
68 {
70 if (runTimeLimit <= 0) {
71 return true;
72 }
73
74 auto nowTime = uv_hrtime();
75
76 // time spent to process the time frame
77 double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
78 runTime += time_spent;
79 lastTime = nowTime;
80
81 return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
82 }
83
84 void printOut()
85 {
86 LOGP(info, "RuntimeWatchdog");
87 LOGP(info, " run time limit: {}", runTimeLimit);
88 LOGP(info, " number of time frames: {}", numberTimeFrames);
89 LOGP(info, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
90 LOGP(info, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
91 }
92};
93
94using o2::monitoring::Metric;
95using o2::monitoring::Monitoring;
96using o2::monitoring::tags::Key;
97using o2::monitoring::tags::Value;
98
100{
102{
103 // aod-parent-base-path-replacement is now a workflow option, so it needs to be
104 // retrieved from the ConfigContext. This is because we do not allow workflow options
105 // to change over start-stop-start because they can affect the topology generation.
106 std::string parentFileReplacement;
107 if (ctx.options().isSet("aod-parent-base-path-replacement")) {
108 parentFileReplacement = ctx.options().get<std::string>("aod-parent-base-path-replacement");
109 }
110 int parentAccessLevel = 0;
111 if (ctx.options().isSet("aod-parent-access-level")) {
112 parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
113 }
114 auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
115 DeviceSpec const& spec,
116 Monitoring& monitoring,
117 DataProcessingStats& stats) {
118 // FIXME: not actually needed, since data processing stats can specify that we should
119 // send the initial value.
120 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
121 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED), DataProcessingStats::Op::Set, 0});
122 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
123 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
124 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
125 stats.updateStats({static_cast<short>(ProcessingStatsId::CONSUMED_TIMEFRAMES), DataProcessingStats::Op::Set, 0});
126
127 if (!options.isSet("aod-file-private")) {
128 LOGP(fatal, "No input file defined!");
129 throw std::runtime_error("Processing is stopped!");
130 }
131
132 auto filename = options.get<std::string>("aod-file-private");
133
134 auto maxRate = options.get<float>("aod-max-io-rate");
135
136 // create a DataInputDirector
137 auto didir = std::make_shared<DataInputDirector>(filename, &monitoring, parentAccessLevel, parentFileReplacement);
138 if (options.isSet("aod-reader-json")) {
139 auto jsonFile = options.get<std::string>("aod-reader-json");
140 if (!didir->readJson(jsonFile)) {
141 LOGP(error, "Check the JSON document! Can not be properly parsed!");
142 }
143 }
144
145 // get the run time watchdog
146 auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
147
148 // selected the TFN input and
149 // create list of requested tables
150 bool reportTFN = false;
151 bool reportTFFileName = false;
152 header::DataHeader TFNumberHeader;
153 header::DataHeader TFFileNameHeader;
154 std::vector<OutputRoute> requestedTables;
155 std::vector<OutputRoute> routes(spec.outputs);
156 for (auto route : routes) {
157 if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
158 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
159 TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
160 reportTFN = true;
161 } else if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFF"))) {
162 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
163 TFFileNameHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
164 reportTFFileName = true;
165 } else {
166 requestedTables.emplace_back(route);
167 }
168 }
169
170 auto fileCounter = std::make_shared<int>(0);
171 auto numTF = std::make_shared<int>(-1);
172 return adaptStateless([TFNumberHeader,
173 TFFileNameHeader,
174 requestedTables,
175 fileCounter,
176 numTF,
177 watchdog,
178 maxRate,
179 didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device, DataProcessingStats& dpstats) {
180 // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
181 // the TF to read is numTF
182 assert(device.inputTimesliceId < device.maxInputTimeslices);
183 int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
184 int ntf = *numTF + 1;
185 static int currentFileCounter = -1;
186 static int filesProcessed = 0;
187 if (currentFileCounter != *fileCounter) {
188 currentFileCounter = *fileCounter;
189 monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
190 }
191
192 // loop over requested tables
193 bool first = true;
194 static size_t totalSizeUncompressed = 0;
195 static size_t totalSizeCompressed = 0;
196 static uint64_t totalDFSent = 0;
197
198 // check if RuntimeLimit is reached
199 if (!watchdog->update()) {
200 LOGP(info, "Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
201 LOGP(info, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
202 didir->closeInputFiles();
203 monitoring.flushBuffer();
204 control.endOfStream();
206 return;
207 }
208
209 int64_t startTime = uv_hrtime();
210 int64_t startSize = totalSizeCompressed;
211 for (auto& route : requestedTables) {
212 if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
213 continue;
214 }
215
216 // create header
217 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
218 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
219
220 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
221 if (first) {
222 // check if there is a next file to read
223 fcnt += device.maxInputTimeslices;
224 if (didir->atEnd(fcnt)) {
225 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
226 didir->closeInputFiles();
227 monitoring.flushBuffer();
228 control.endOfStream();
230 return;
231 }
232 // get first folder of next file
233 ntf = 0;
234 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
235 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
236 throw std::runtime_error("Processing is stopped!");
237 }
238 } else {
239 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
240 throw std::runtime_error("Processing is stopped!");
241 }
242 }
243
244 if (first) {
245 if (reportTFN) {
246 // TF number
247 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
248 auto o = Output(TFNumberHeader);
249 outputs.make<uint64_t>(o) = timeFrameNumber;
250 }
251
252 if (reportTFFileName) {
253 // Origin file name for derived output map
254 auto o2 = Output(TFFileNameHeader);
255 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
256 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
257 auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
258 std::string currentFilename(f->GetFile()->GetName());
259 if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
260 // This is not an absolute local path. Make it absolute.
261 static std::string pwd = gSystem->pwd() + std::string("/");
262 currentFilename = pwd + std::string(f->GetName());
263 }
264 outputs.make<std::string>(o2) = currentFilename;
265 }
266 }
267 first = false;
268 }
269 int64_t stopSize = totalSizeCompressed;
270 int64_t bytesDelta = stopSize - startSize;
271 int64_t stopTime = uv_hrtime();
272 float currentDelta = float(stopTime - startTime) / 1000000000; // in s
273 if (ceil(maxRate) > 0.) {
274 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
275 // We only sleep if we read faster than the max-read-rate.
276 if (extraTime > 0.) {
277 LOGP(info, "Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
278 uv_sleep(extraTime * 1000); // in milliseconds
279 }
280 }
281 totalDFSent++;
282
283 // Use the new API for sending TIMESLICE_NUMBER_STARTED
285 dpstats.processCommandQueue();
286 monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
287 monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
288 monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
289
290 // save file number and time frame
291 *fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
292 *numTF = ntf;
293
294 // Check if the next timeframe is available or
295 // if there are more files to be processed. If not, simply exit.
296 fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
297 ntf = *numTF + 1;
298 auto& firstRoute = requestedTables.front();
299 auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
300 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
301 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
302
303 // In case the filesource is empty, move to the next one.
304 if (fileAndFolder.path().empty()) {
305 fcnt += 1;
306 ntf = 0;
307 if (didir->atEnd(fcnt)) {
308 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
309 didir->closeInputFiles();
310 monitoring.flushBuffer();
311 control.endOfStream();
313 return;
314 }
315 }
316 });
317 })};
318
319 return callback;
320}
321
322} // namespace o2::framework::readers
std::vector< OutputRoute > routes
o2::monitoring::Metric Metric
o2::monitoring::Monitoring Monitoring
ConfigParamRegistry & options() const
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
GLdouble f
Definition glcorearb.h:310
@ Me
Only quit this data processor.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string filename()
RuntimeWatchdog(Long64_t limit)
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
size_t maxInputTimeslices
The maximum number of time pipelining for this device.
Definition DeviceSpec.h:70
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static AlgorithmSpec rootFileReaderCallback(ConfigContext const &context)
the main header struct
Definition DataHeader.h:619