Project
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages Concepts
AODJAlienReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <memory>
27#include "DataInputDirector.h"
30#include "Framework/Logger.h"
31
32#if __has_include(<TJAlienFile.h>)
33#include <TJAlienFile.h>
34#endif
35#include <TGrid.h>
36#include <TFile.h>
37#include <TTreeCache.h>
38#include <TSystem.h>
39
40#include <arrow/ipc/reader.h>
41#include <arrow/ipc/writer.h>
42#include <arrow/io/interfaces.h>
43#include <arrow/table.h>
44#include <arrow/util/key_value_metadata.h>
45#include <arrow/dataset/dataset.h>
46#include <arrow/dataset/file_base.h>
47
48using namespace o2;
49using namespace o2::aod;
50
53 uint64_t startTime;
54 uint64_t lastTime;
55 double runTime;
56 uint64_t runTimeLimit;
57
58 RuntimeWatchdog(Long64_t limit)
59 {
61 startTime = uv_hrtime();
63 runTime = 0.;
64 runTimeLimit = limit;
65 }
66
67 bool update()
68 {
70 if (runTimeLimit <= 0) {
71 return true;
72 }
73
74 auto nowTime = uv_hrtime();
75
76 // time spent to process the time frame
77 double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
78 runTime += time_spent;
79 lastTime = nowTime;
80
81 return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
82 }
83
84 void printOut()
85 {
86 LOGP(info, "RuntimeWatchdog");
87 LOGP(info, " run time limit: {}", runTimeLimit);
88 LOGP(info, " number of time frames: {}", numberTimeFrames);
89 LOGP(info, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
90 LOGP(info, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
91 }
92};
93
94using o2::monitoring::Metric;
95using o2::monitoring::Monitoring;
96using o2::monitoring::tags::Key;
97using o2::monitoring::tags::Value;
98
100{
102{
103 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
104 [](EndOfStreamContext& eosc) {
105 auto& control = eosc.services().get<ControlService>();
106 control.endOfStream();
107 control.readyToQuit(QuitRequest::Me);
108 });
109}
110
111template <typename O>
112static inline auto extractTypedOriginal(ProcessingContext& pc)
113{
115 return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
116}
117
118template <typename... Os>
119static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
120{
121 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
122}
123
125{
126 // aod-parent-base-path-replacement is now a workflow option, so it needs to be
127 // retrieved from the ConfigContext. This is because we do not allow workflow options
128 // to change over start-stop-start because they can affect the topology generation.
129 std::string parentFileReplacement;
130 if (ctx.options().isSet("aod-parent-base-path-replacement")) {
131 parentFileReplacement = ctx.options().get<std::string>("aod-parent-base-path-replacement");
132 }
133 int parentAccessLevel = 0;
134 if (ctx.options().isSet("aod-parent-access-level")) {
135 parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
136 }
137 auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
138 DeviceSpec const& spec,
139 Monitoring& monitoring,
140 DataProcessingStats& stats) {
141 // FIXME: not actually needed, since data processing stats can specify that we should
142 // send the initial value.
143 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
144 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED), DataProcessingStats::Op::Set, 0});
145 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
146 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
147 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
148
149 if (!options.isSet("aod-file-private")) {
150 LOGP(fatal, "No input file defined!");
151 throw std::runtime_error("Processing is stopped!");
152 }
153
154 auto filename = options.get<std::string>("aod-file-private");
155
156 auto maxRate = options.get<float>("aod-max-io-rate");
157
158 // create a DataInputDirector
159 auto didir = std::make_shared<DataInputDirector>(filename, &monitoring, parentAccessLevel, parentFileReplacement);
160 if (options.isSet("aod-reader-json")) {
161 auto jsonFile = options.get<std::string>("aod-reader-json");
162 if (!didir->readJson(jsonFile)) {
163 LOGP(error, "Check the JSON document! Can not be properly parsed!");
164 }
165 }
166
167 // get the run time watchdog
168 auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
169
170 // selected the TFN input and
171 // create list of requested tables
172 bool reportTFN = false;
173 bool reportTFFileName = false;
174 header::DataHeader TFNumberHeader;
175 header::DataHeader TFFileNameHeader;
176 std::vector<OutputRoute> requestedTables;
177 std::vector<OutputRoute> routes(spec.outputs);
178 for (auto route : routes) {
179 if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
180 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
181 TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
182 reportTFN = true;
183 } else if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFF"))) {
184 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
185 TFFileNameHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
186 reportTFFileName = true;
187 } else {
188 requestedTables.emplace_back(route);
189 }
190 }
191
192 auto fileCounter = std::make_shared<int>(0);
193 auto numTF = std::make_shared<int>(-1);
194 return adaptStateless([TFNumberHeader,
195 TFFileNameHeader,
196 requestedTables,
197 fileCounter,
198 numTF,
199 watchdog,
200 maxRate,
201 didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
202 // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
203 // the TF to read is numTF
204 assert(device.inputTimesliceId < device.maxInputTimeslices);
205 int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
206 int ntf = *numTF + 1;
207 static int currentFileCounter = -1;
208 static int filesProcessed = 0;
209 if (currentFileCounter != *fileCounter) {
210 currentFileCounter = *fileCounter;
211 monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
212 }
213
214 // loop over requested tables
215 bool first = true;
216 static size_t totalSizeUncompressed = 0;
217 static size_t totalSizeCompressed = 0;
218 static uint64_t totalDFSent = 0;
219
220 // check if RuntimeLimit is reached
221 if (!watchdog->update()) {
222 LOGP(info, "Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
223 LOGP(info, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
224 didir->closeInputFiles();
225 monitoring.flushBuffer();
226 control.endOfStream();
228 return;
229 }
230
231 int64_t startTime = uv_hrtime();
232 int64_t startSize = totalSizeCompressed;
233 for (auto& route : requestedTables) {
234 if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
235 continue;
236 }
237
238 // create header
239 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
240 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
241
242 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
243 if (first) {
244 // check if there is a next file to read
245 fcnt += device.maxInputTimeslices;
246 if (didir->atEnd(fcnt)) {
247 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
248 didir->closeInputFiles();
249 monitoring.flushBuffer();
250 control.endOfStream();
252 return;
253 }
254 // get first folder of next file
255 ntf = 0;
256 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
257 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
258 throw std::runtime_error("Processing is stopped!");
259 }
260 } else {
261 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
262 throw std::runtime_error("Processing is stopped!");
263 }
264 }
265
266 if (first) {
267 if (reportTFN) {
268 // TF number
269 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
270 auto o = Output(TFNumberHeader);
271 outputs.make<uint64_t>(o) = timeFrameNumber;
272 }
273
274 if (reportTFFileName) {
275 // Origin file name for derived output map
276 auto o2 = Output(TFFileNameHeader);
277 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
278 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
279 auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
280 std::string currentFilename(f->GetFile()->GetName());
281 if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
282 // This is not an absolute local path. Make it absolute.
283 static std::string pwd = gSystem->pwd() + std::string("/");
284 currentFilename = pwd + std::string(f->GetName());
285 }
286 outputs.make<std::string>(o2) = currentFilename;
287 }
288 }
289 first = false;
290 }
291 int64_t stopSize = totalSizeCompressed;
292 int64_t bytesDelta = stopSize - startSize;
293 int64_t stopTime = uv_hrtime();
294 float currentDelta = float(stopTime - startTime) / 1000000000; // in s
295 if (ceil(maxRate) > 0.) {
296 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
297 // We only sleep if we read faster than the max-read-rate.
298 if (extraTime > 0.) {
299 LOGP(info, "Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
300 uv_sleep(extraTime * 1000); // in milliseconds
301 }
302 }
303 totalDFSent++;
304 monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
305 monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
306 monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
307
308 // save file number and time frame
309 *fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
310 *numTF = ntf;
311
312 // Check if the next timeframe is available or
313 // if there are more files to be processed. If not, simply exit.
314 fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
315 ntf = *numTF + 1;
316 auto& firstRoute = requestedTables.front();
317 auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
318 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
319 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
320
321 // In case the filesource is empty, move to the next one.
322 if (fileAndFolder.path().empty()) {
323 fcnt += 1;
324 ntf = 0;
325 if (didir->atEnd(fcnt)) {
326 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
327 didir->closeInputFiles();
328 monitoring.flushBuffer();
329 control.endOfStream();
331 return;
332 }
333 }
334 });
335 })};
336
337 return callback;
338}
339
340} // namespace o2::framework::readers
o2::monitoring::Metric Metric
bool o
o2::monitoring::Monitoring Monitoring
ConfigParamRegistry & options() const
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
GLdouble f
Definition glcorearb.h:310
auto setEOSCallback(InitContext &ic)
@ Me
Only quit this data processor.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string filename()
RuntimeWatchdog(Long64_t limit)
Helper struct to hold statistics about the data processing happening.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
size_t maxInputTimeslices
The maximum number of time pipelining for this device.
Definition DeviceSpec.h:70
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static AlgorithmSpec rootFileReaderCallback(ConfigContext const &context)
the main header struct
Definition DataHeader.h:618