Project
Loading...
Searching...
No Matches
AODJAlienReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <memory>
27#include "DataInputDirector.h"
30#include "Framework/Logger.h"
31
32#if __has_include(<TJAlienFile.h>)
33#include <TJAlienFile.h>
34#endif
35#include <TGrid.h>
36#include <TFile.h>
37#include <TTreeCache.h>
38#include <TSystem.h>
39
40#include <arrow/ipc/reader.h>
41#include <arrow/ipc/writer.h>
42#include <arrow/io/interfaces.h>
43#include <arrow/table.h>
44#include <arrow/util/key_value_metadata.h>
45#include <arrow/dataset/dataset.h>
46#include <arrow/dataset/file_base.h>
47
48using namespace o2;
49using namespace o2::aod;
50
53 uint64_t startTime;
54 uint64_t lastTime;
55 double runTime;
56 uint64_t runTimeLimit;
57
58 RuntimeWatchdog(Long64_t limit)
59 {
61 startTime = uv_hrtime();
63 runTime = 0.;
64 runTimeLimit = limit;
65 }
66
67 bool update()
68 {
70 if (runTimeLimit <= 0) {
71 return true;
72 }
73
74 auto nowTime = uv_hrtime();
75
76 // time spent to process the time frame
77 double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
78 runTime += time_spent;
79 lastTime = nowTime;
80
81 return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
82 }
83
84 void printOut()
85 {
86 LOGP(info, "RuntimeWatchdog");
87 LOGP(info, " run time limit: {}", runTimeLimit);
88 LOGP(info, " number of time frames: {}", numberTimeFrames);
89 LOGP(info, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
90 LOGP(info, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
91 }
92};
93
94using o2::monitoring::Metric;
95using o2::monitoring::Monitoring;
96using o2::monitoring::tags::Key;
97using o2::monitoring::tags::Value;
98
100{
102{
103 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
104 [](EndOfStreamContext& eosc) {
105 auto& control = eosc.services().get<ControlService>();
106 control.endOfStream();
107 control.readyToQuit(QuitRequest::Me);
108 });
109}
110
111template <typename O>
112static inline auto extractTypedOriginal(ProcessingContext& pc)
113{
115 return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
116}
117
118template <typename... Os>
119static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
120{
121 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
122}
123
125{
126 // aod-parent-base-path-replacement is now a workflow option, so it needs to be
127 // retrieved from the ConfigContext. This is because we do not allow workflow options
128 // to change over start-stop-start because they can affect the topology generation.
129 std::string parentFileReplacement;
130 if (ctx.options().isSet("aod-parent-base-path-replacement")) {
131 parentFileReplacement = ctx.options().get<std::string>("aod-parent-base-path-replacement");
132 }
133 int parentAccessLevel = 0;
134 if (ctx.options().isSet("aod-parent-access-level")) {
135 parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
136 }
137 auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
138 DeviceSpec const& spec,
139 Monitoring& monitoring,
140 DataProcessingStats& stats) {
141 // FIXME: not actually needed, since data processing stats can specify that we should
142 // send the initial value.
143 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
144 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED), DataProcessingStats::Op::Set, 0});
145 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
146 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
147 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
148 stats.updateStats({static_cast<short>(ProcessingStatsId::CONSUMED_TIMEFRAMES), DataProcessingStats::Op::Set, 0});
149
150 if (!options.isSet("aod-file-private")) {
151 LOGP(fatal, "No input file defined!");
152 throw std::runtime_error("Processing is stopped!");
153 }
154
155 auto filename = options.get<std::string>("aod-file-private");
156
157 auto maxRate = options.get<float>("aod-max-io-rate");
158
159 // create a DataInputDirector
160 auto didir = std::make_shared<DataInputDirector>(filename, &monitoring, parentAccessLevel, parentFileReplacement);
161 if (options.isSet("aod-reader-json")) {
162 auto jsonFile = options.get<std::string>("aod-reader-json");
163 if (!didir->readJson(jsonFile)) {
164 LOGP(error, "Check the JSON document! Can not be properly parsed!");
165 }
166 }
167
168 // get the run time watchdog
169 auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
170
171 // selected the TFN input and
172 // create list of requested tables
173 bool reportTFN = false;
174 bool reportTFFileName = false;
175 header::DataHeader TFNumberHeader;
176 header::DataHeader TFFileNameHeader;
177 std::vector<OutputRoute> requestedTables;
178 std::vector<OutputRoute> routes(spec.outputs);
179 for (auto route : routes) {
180 if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
181 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
182 TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
183 reportTFN = true;
184 } else if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFF"))) {
185 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
186 TFFileNameHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
187 reportTFFileName = true;
188 } else {
189 requestedTables.emplace_back(route);
190 }
191 }
192
193 auto fileCounter = std::make_shared<int>(0);
194 auto numTF = std::make_shared<int>(-1);
195 return adaptStateless([TFNumberHeader,
196 TFFileNameHeader,
197 requestedTables,
198 fileCounter,
199 numTF,
200 watchdog,
201 maxRate,
202 didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
203 // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
204 // the TF to read is numTF
205 assert(device.inputTimesliceId < device.maxInputTimeslices);
206 int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
207 int ntf = *numTF + 1;
208 static int currentFileCounter = -1;
209 static int filesProcessed = 0;
210 if (currentFileCounter != *fileCounter) {
211 currentFileCounter = *fileCounter;
212 monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
213 }
214
215 // loop over requested tables
216 bool first = true;
217 static size_t totalSizeUncompressed = 0;
218 static size_t totalSizeCompressed = 0;
219 static uint64_t totalDFSent = 0;
220
221 // check if RuntimeLimit is reached
222 if (!watchdog->update()) {
223 LOGP(info, "Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
224 LOGP(info, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
225 didir->closeInputFiles();
226 monitoring.flushBuffer();
227 control.endOfStream();
229 return;
230 }
231
232 int64_t startTime = uv_hrtime();
233 int64_t startSize = totalSizeCompressed;
234 for (auto& route : requestedTables) {
235 if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
236 continue;
237 }
238
239 // create header
240 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
241 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
242
243 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
244 if (first) {
245 // check if there is a next file to read
246 fcnt += device.maxInputTimeslices;
247 if (didir->atEnd(fcnt)) {
248 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
249 didir->closeInputFiles();
250 monitoring.flushBuffer();
251 control.endOfStream();
253 return;
254 }
255 // get first folder of next file
256 ntf = 0;
257 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
258 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
259 throw std::runtime_error("Processing is stopped!");
260 }
261 } else {
262 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
263 throw std::runtime_error("Processing is stopped!");
264 }
265 }
266
267 if (first) {
268 if (reportTFN) {
269 // TF number
270 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
271 auto o = Output(TFNumberHeader);
272 outputs.make<uint64_t>(o) = timeFrameNumber;
273 }
274
275 if (reportTFFileName) {
276 // Origin file name for derived output map
277 auto o2 = Output(TFFileNameHeader);
278 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
279 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
280 auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
281 std::string currentFilename(f->GetFile()->GetName());
282 if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
283 // This is not an absolute local path. Make it absolute.
284 static std::string pwd = gSystem->pwd() + std::string("/");
285 currentFilename = pwd + std::string(f->GetName());
286 }
287 outputs.make<std::string>(o2) = currentFilename;
288 }
289 }
290 first = false;
291 }
292 int64_t stopSize = totalSizeCompressed;
293 int64_t bytesDelta = stopSize - startSize;
294 int64_t stopTime = uv_hrtime();
295 float currentDelta = float(stopTime - startTime) / 1000000000; // in s
296 if (ceil(maxRate) > 0.) {
297 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
298 // We only sleep if we read faster than the max-read-rate.
299 if (extraTime > 0.) {
300 LOGP(info, "Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
301 uv_sleep(extraTime * 1000); // in milliseconds
302 }
303 }
304 totalDFSent++;
305 monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
306 monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
307 monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
308
309 // save file number and time frame
310 *fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
311 *numTF = ntf;
312
313 // Check if the next timeframe is available or
314 // if there are more files to be processed. If not, simply exit.
315 fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
316 ntf = *numTF + 1;
317 auto& firstRoute = requestedTables.front();
318 auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
319 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
320 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
321
322 // In case the filesource is empty, move to the next one.
323 if (fileAndFolder.path().empty()) {
324 fcnt += 1;
325 ntf = 0;
326 if (didir->atEnd(fcnt)) {
327 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
328 didir->closeInputFiles();
329 monitoring.flushBuffer();
330 control.endOfStream();
332 return;
333 }
334 }
335 });
336 })};
337
338 return callback;
339}
340
341} // namespace o2::framework::readers
std::vector< OutputRoute > routes
o2::monitoring::Metric Metric
o2::monitoring::Monitoring Monitoring
ConfigParamRegistry & options() const
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
GLdouble f
Definition glcorearb.h:310
auto setEOSCallback(InitContext &ic)
@ Me
Only quit this data processor.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string filename()
RuntimeWatchdog(Long64_t limit)
Helper struct to hold statistics about the data processing happening.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
size_t maxInputTimeslices
The maximum number of time pipelining for this device.
Definition DeviceSpec.h:70
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static AlgorithmSpec rootFileReaderCallback(ConfigContext const &context)
the main header struct
Definition DataHeader.h:619