Project
Loading...
Searching...
No Matches
AODJAlienReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
26#include "DataInputDirector.h"
29#include "Framework/Logger.h"
30
31#if __has_include(<TJAlienFile.h>)
32#include <TJAlienFile.h>
33#endif
34#include <TGrid.h>
35#include <TFile.h>
36#include <TTreeCache.h>
37#include <TSystem.h>
38
39#include <arrow/ipc/reader.h>
40#include <arrow/ipc/writer.h>
41#include <arrow/io/interfaces.h>
42#include <arrow/table.h>
43#include <arrow/util/key_value_metadata.h>
44
45using namespace o2;
46using namespace o2::aod;
47
50 uint64_t startTime;
51 uint64_t lastTime;
52 double runTime;
53 uint64_t runTimeLimit;
54
55 RuntimeWatchdog(Long64_t limit)
56 {
58 startTime = uv_hrtime();
60 runTime = 0.;
61 runTimeLimit = limit;
62 }
63
64 bool update()
65 {
67 if (runTimeLimit <= 0) {
68 return true;
69 }
70
71 auto nowTime = uv_hrtime();
72
73 // time spent to process the time frame
74 double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
75 runTime += time_spent;
76 lastTime = nowTime;
77
78 return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
79 }
80
81 void printOut()
82 {
83 LOGP(info, "RuntimeWatchdog");
84 LOGP(info, " run time limit: {}", runTimeLimit);
85 LOGP(info, " number of time frames: {}", numberTimeFrames);
86 LOGP(info, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
87 LOGP(info, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
88 }
89};
90
91using o2::monitoring::Metric;
92using o2::monitoring::Monitoring;
93using o2::monitoring::tags::Key;
94using o2::monitoring::tags::Value;
95
97{
99{
100 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
101 [](EndOfStreamContext& eosc) {
102 auto& control = eosc.services().get<ControlService>();
103 control.endOfStream();
104 control.readyToQuit(QuitRequest::Me);
105 });
106}
107
108template <typename O>
109static inline auto extractTypedOriginal(ProcessingContext& pc)
110{
112 return O{pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable()};
113}
114
115template <typename... Os>
116static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
117{
118 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
119}
120
122{
123 // aod-parent-base-path-replacement is now a workflow option, so it needs to be
124 // retrieved from the ConfigContext. This is because we do not allow workflow options
125 // to change over start-stop-start because they can affect the topology generation.
126 std::string parentFileReplacement;
127 if (ctx.options().isSet("aod-parent-base-path-replacement")) {
128 parentFileReplacement = ctx.options().get<std::string>("aod-parent-base-path-replacement");
129 }
130 int parentAccessLevel = 0;
131 if (ctx.options().isSet("aod-parent-access-level")) {
132 parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
133 }
134 auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
135 DeviceSpec const& spec,
136 Monitoring& monitoring,
137 DataProcessingStats& stats) {
138 // FIXME: not actually needed, since data processing stats can specify that we should
139 // send the initial value.
140 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
141 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_CREATED), DataProcessingStats::Op::Set, 0});
142 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, 0});
143 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, 0});
144 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_EXPIRED), DataProcessingStats::Op::Set, 0});
145
146 if (!options.isSet("aod-file-private")) {
147 LOGP(fatal, "No input file defined!");
148 throw std::runtime_error("Processing is stopped!");
149 }
150
151 auto filename = options.get<std::string>("aod-file-private");
152
153 auto maxRate = options.get<float>("aod-max-io-rate");
154
155 // create a DataInputDirector
156 auto didir = std::make_shared<DataInputDirector>(filename, &monitoring, parentAccessLevel, parentFileReplacement);
157 if (options.isSet("aod-reader-json")) {
158 auto jsonFile = options.get<std::string>("aod-reader-json");
159 if (!didir->readJson(jsonFile)) {
160 LOGP(error, "Check the JSON document! Can not be properly parsed!");
161 }
162 }
163
164 // get the run time watchdog
165 auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
166
167 // selected the TFN input and
168 // create list of requested tables
169 bool reportTFN = false;
170 bool reportTFFileName = false;
171 header::DataHeader TFNumberHeader;
172 header::DataHeader TFFileNameHeader;
173 std::vector<OutputRoute> requestedTables;
174 std::vector<OutputRoute> routes(spec.outputs);
175 for (auto route : routes) {
176 if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFN"))) {
177 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
178 TFNumberHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
179 reportTFN = true;
180 } else if (DataSpecUtils::partialMatch(route.matcher, header::DataOrigin("TFF"))) {
181 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
182 TFFileNameHeader = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
183 reportTFFileName = true;
184 } else {
185 requestedTables.emplace_back(route);
186 }
187 }
188
189 auto fileCounter = std::make_shared<int>(0);
190 auto numTF = std::make_shared<int>(-1);
191 return adaptStateless([TFNumberHeader,
192 TFFileNameHeader,
193 requestedTables,
194 fileCounter,
195 numTF,
196 watchdog,
197 maxRate,
198 didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
199 // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
200 // the TF to read is numTF
201 assert(device.inputTimesliceId < device.maxInputTimeslices);
202 int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
203 int ntf = *numTF + 1;
204 static int currentFileCounter = -1;
205 static int filesProcessed = 0;
206 if (currentFileCounter != *fileCounter) {
207 currentFileCounter = *fileCounter;
208 monitoring.send(Metric{(uint64_t)++filesProcessed, "files-opened"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
209 }
210
211 // loop over requested tables
212 bool first = true;
213 static size_t totalSizeUncompressed = 0;
214 static size_t totalSizeCompressed = 0;
215 static uint64_t totalDFSent = 0;
216
217 // check if RuntimeLimit is reached
218 if (!watchdog->update()) {
219 LOGP(info, "Run time exceeds run time limit of {} seconds. Exiting gracefully...", watchdog->runTimeLimit);
220 LOGP(info, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1);
221 didir->closeInputFiles();
222 monitoring.flushBuffer();
223 control.endOfStream();
225 return;
226 }
227
228 int64_t startTime = uv_hrtime();
229 int64_t startSize = totalSizeCompressed;
230 for (auto& route : requestedTables) {
231 if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
232 continue;
233 }
234
235 // create header
236 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
237 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
238
239 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
240 if (first) {
241 // check if there is a next file to read
242 fcnt += device.maxInputTimeslices;
243 if (didir->atEnd(fcnt)) {
244 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
245 didir->closeInputFiles();
246 monitoring.flushBuffer();
247 control.endOfStream();
249 return;
250 }
251 // get first folder of next file
252 ntf = 0;
253 if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
254 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
255 throw std::runtime_error("Processing is stopped!");
256 }
257 } else {
258 LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
259 throw std::runtime_error("Processing is stopped!");
260 }
261 }
262
263 if (first) {
264 if (reportTFN) {
265 // TF number
266 auto timeFrameNumber = didir->getTimeFrameNumber(dh, fcnt, ntf);
267 auto o = Output(TFNumberHeader);
268 outputs.make<uint64_t>(o) = timeFrameNumber;
269 }
270
271 if (reportTFFileName) {
272 // Origin file name for derived output map
273 auto o2 = Output(TFFileNameHeader);
274 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
275 std::string currentFilename(fileAndFolder.file->GetName());
276 if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
277 // This is not an absolute local path. Make it absolute.
278 static std::string pwd = gSystem->pwd() + std::string("/");
279 currentFilename = pwd + std::string(fileAndFolder.file->GetName());
280 }
281 outputs.make<std::string>(o2) = currentFilename;
282 }
283 }
284 first = false;
285 }
286 int64_t stopSize = totalSizeCompressed;
287 int64_t bytesDelta = stopSize - startSize;
288 int64_t stopTime = uv_hrtime();
289 float currentDelta = float(stopTime - startTime) / 1000000000; // in s
290 if (ceil(maxRate) > 0.) {
291 float extraTime = (bytesDelta / 1000000 - currentDelta * maxRate) / maxRate;
292 // We only sleep if we read faster than the max-read-rate.
293 if (extraTime > 0.) {
294 LOGP(info, "Read {} MB in {} s. Sleeping for {} seconds to stay within {} MB/s limit.", bytesDelta / 1000000, currentDelta, extraTime, maxRate);
295 uv_sleep(extraTime * 1000); // in milliseconds
296 }
297 }
298 totalDFSent++;
299 monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
300 monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
301 monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
302
303 // save file number and time frame
304 *fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices;
305 *numTF = ntf;
306
307 // Check if the next timeframe is available or
308 // if there are more files to be processed. If not, simply exit.
309 fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId;
310 ntf = *numTF + 1;
311 auto& firstRoute = requestedTables.front();
312 auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
313 auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
314 auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
315 if (!fileAndFolder.file) {
316 fcnt += 1;
317 ntf = 0;
318 if (didir->atEnd(fcnt)) {
319 LOGP(info, "No input files left to read for reader {}!", device.inputTimesliceId);
320 didir->closeInputFiles();
321 monitoring.flushBuffer();
322 control.endOfStream();
324 return;
325 }
326 }
327 });
328 })};
329
330 return callback;
331}
332
333} // namespace o2::framework::readers
o2::monitoring::Metric Metric
bool o
o2::monitoring::Monitoring Monitoring
ConfigParamRegistry & options() const
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
auto setEOSCallback(InitContext &ic)
@ Me
Only quit this data processor.
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string filename()
RuntimeWatchdog(Long64_t limit)
Helper struct to hold statistics about the data processing happening.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
size_t maxInputTimeslices
The maximum number of time pipelining for this device.
Definition DeviceSpec.h:70
size_t inputTimesliceId
The time pipelining id of this particular device.
Definition DeviceSpec.h:68
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static AlgorithmSpec rootFileReaderCallback(ConfigContext const &context)
the main header struct
Definition DataHeader.h:618