Project
Loading...
Searching...
No Matches
FileFetcher.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
18#include "Framework/Logger.h"
19#include <filesystem>
20#include <fstream>
21#include <memory>
22#include <thread>
23#include <chrono>
24#include <cstdlib>
25#include <locale>
26#include <TGrid.h>
27#include <TSystem.h>
28
29using namespace o2::utils;
30using namespace std::chrono_literals;
31namespace fs = std::filesystem;
32
33//____________________________________________________________
34FileFetcher::FileFetcher(const std::string& input, const std::string& selRegex, const std::string& remRegex,
35 const std::string& copyCmd, const std::string& copyDir)
36 : mCopyCmd(copyCmd), mCopyDirName(copyDir)
37{
38 if (!selRegex.empty()) {
39 mSelRegex = std::make_unique<std::regex>(selRegex.c_str());
40 }
41 if (!remRegex.empty()) {
42 mRemRegex = std::make_unique<std::regex>(remRegex);
43 }
44 mNoRemoteCopy = mCopyCmd == "no-copy";
45
46 // parse input list
47 mCopyDirName = o2::utils::Str::create_unique_path(mCopyDirName, 8);
48 processInput(input);
49 LOGP(info, "Input contains {} files, {} remote", getNFiles(), mNRemote);
50 if (mNRemote) {
51 if (mNoRemoteCopy) { // make sure the copy command is provided, unless copy was explicitly forbidden
52 LOGP(info, "... but their local copying is explicitly forbidden");
53 } else {
54 if (mCopyCmd.find("?src") == std::string::npos || mCopyCmd.find("?dst") == std::string::npos) {
55 throw std::runtime_error(fmt::format("remote files asked but copy cmd \"{}\" is not valid", mCopyCmd));
56 }
57 try {
59 } catch (...) {
60 throw std::runtime_error(fmt::format("failed to create scratch directory {}", mCopyDirName));
61 }
62 mCopyCmdLogFile = fmt::format("{}/{}", mCopyDirName, "copy-cmd.log");
63 LOGP(info, "FileFetcher tmp scratch directory is set to {}", mCopyDirName);
64 }
65 }
66}
67
68//____________________________________________________________
74
75//____________________________________________________________
76void FileFetcher::processInput(const std::string& input)
77{
78 auto ftokens = o2::utils::Str::tokenize(input, ',', true); // in case multiple inputs are provided
79 processInput(ftokens);
80}
81
82//____________________________________________________________
83void FileFetcher::processInput(const std::vector<std::string>& input)
84{
85 for (auto inp : input) {
87 if (fs::is_directory(inp)) {
88 processDirectory(inp);
89 } else if (mSelRegex && !std::regex_match(inp, *mSelRegex.get())) { // provided selector does not match, treat as a txt file with list
90 // Avoid reading a multigiB data file as a list of inputs
91 // bringing down the system.
92 std::filesystem::path p(inp);
93 if (std::filesystem::file_size(p) > 10000000) {
94 LOGP(error, "file list {} larger than 10MB. Is this a data file?", inp);
95 continue;
96 }
97
98 std::ifstream listFile(inp);
99 if (!listFile.good()) {
100 LOGP(error, "file {} pretends to be a list of inputs but does not exist", inp);
101 continue;
102 }
103 std::string line;
104 std::vector<std::string> newInput;
105 while (getline(listFile, line)) {
107 if (line[0] == '#' || line.empty()) { // ignore commented file or empty line
108 continue;
109 }
110 newInput.push_back(line);
111 }
112 processInput(newInput);
113 } else { // should be local or remote data file
114 addInputFile(inp);
115 }
116 }
117}
118
119//____________________________________________________________
120void FileFetcher::processDirectory(const std::string& name)
121{
122 std::vector<std::string> vs;
123 for (auto const& entry : fs::directory_iterator{name}) {
124 const auto& fnm = entry.path().native();
125 if (fs::is_regular_file(fnm) && (!mSelRegex || std::regex_match(fnm, *mSelRegex.get()))) {
126 vs.push_back(fnm);
127 }
128 }
129 std::sort(vs.begin(), vs.end());
130 for (const auto& s : vs) {
131 addInputFile(s); // local files only
132 }
133}
134
135//____________________________________________________________
136bool FileFetcher::addInputFile(const std::string& fname)
137{
138 static bool alienErrorPrinted = false;
139 if (mRemRegex && std::regex_match(fname, *mRemRegex.get())) {
140 mInputFiles.emplace_back(FileRef{fname, mNoRemoteCopy ? fname : createCopyName(fname), true, false});
141 if (fname.find("alien:") == 0) {
142 if (!gGrid && !TGrid::Connect("alien://") && !alienErrorPrinted) {
143 LOG(error) << "File name starts with alien but connection to Grid failed";
144 alienErrorPrinted = true;
145 }
146 }
147 mNRemote++;
148 } else if (fs::exists(fname)) { // local file
149 mInputFiles.emplace_back(FileRef{fname, "", false, false});
150 } else {
151 LOGP(error, "file {} pretends to be local but does not exist", fname);
152 return false;
153 }
154 return true;
155}
156
157//____________________________________________________________
158std::string FileFetcher::createCopyName(const std::string& fname) const
159{
160 std::string cpnam{}, cpnamP = fname;
161 for (auto& c : cpnamP) {
162 if (!std::isalnum(c) && c != '.' && c != '-') {
163 c = '_';
164 }
165 }
166 while (1) {
167 cpnam = fmt::format("{}/{}_{}", mCopyDirName, o2::utils::Str::getRandomString(12), cpnamP);
168 if (!fs::exists(cpnam)) {
169 break;
170 }
171 }
172 return cpnam;
173}
174
175//____________________________________________________________
176size_t FileFetcher::popFromQueue(bool discard)
177{
178 // remove file from the queue, if requested and if it was copied, remove copy
179 std::lock_guard<std::mutex> lock(mMtx);
180 const auto* ptr = mQueue.frontPtr();
181 if (mQueue.empty()) {
182 return -1ul;
183 }
184 auto id = mQueue.front();
185 mQueue.pop();
186 if (discard) {
187 discardFile(mInputFiles[id].getLocalName());
188 }
189 return id;
190}
191
192//____________________________________________________________
193size_t FileFetcher::nextInQueue() const
194{
195 return mQueue.empty() ? -1ul : mQueue.front();
196}
197
198//____________________________________________________________
200{
201 if (mQueue.empty()) {
202 return {};
203 }
204 return mQueue.empty() ? "" : mInputFiles[mQueue.front()].getLocalName();
205}
206
207//____________________________________________________________
209{
210 if (mRunning) {
211 return;
212 }
213 mRunning = true;
214 mFetcherThread = std::thread(&FileFetcher::fetcher, this);
215}
216
217//____________________________________________________________
219{
220 mRunning = false;
221 std::lock_guard<std::mutex> lock(mMtxStop);
222 if (mFetcherThread.joinable()) {
223 mFetcherThread.join();
224 }
225 if (mFailure) {
226 LOGP(fatal, "too many failures in file fetching: {} in {} attempts for {} files in {} loops, abort", mNFilesProc - mNFilesProcOK, mNFilesProc, getNFiles(), mNLoops);
227 }
228}
229
230//____________________________________________________________
232{
233 if (mRunning) {
234 throw std::runtime_error("FileFetcher thread is still active, cannot cleanup");
235 }
236 if (mNRemote && o2::utils::Str::pathExists(mCopyDirName)) {
237 try {
238 fs::remove_all(mCopyDirName);
239 } catch (...) {
240 LOGP(error, "FileFetcher failed to remove sctrach directory {}", mCopyDirName);
241 }
242 }
243}
244
245//____________________________________________________________
246void FileFetcher::fetcher()
247{
248 // data fetching/copying thread
249 size_t fileEntry = -1ul;
250
251 if (!getNFiles()) {
252 mRunning = false;
253 return;
254 }
255
256 // BOOST requires a locale set
257 try {
258 std::locale loc("");
259 } catch (const std::exception& e) {
260 setenv("LC_ALL", "C", 1);
261 try {
262 std::locale loc("");
263 LOG(info) << "Setting locale";
264 } catch (const std::exception& e) {
265 LOG(info) << "Setting locale failed: " << e.what();
266 return;
267 }
268 }
269
270 while (mRunning) {
271 mNLoops = mNFilesProc / getNFiles();
272 if (mNLoops > mMaxLoops) {
273 LOGP(info, "Finished file fetching: {} of {} files fetched successfully in {} iterations", mNFilesProcOK, mNFilesProc, mMaxLoops);
274 mRunning = false;
275 break;
276 }
277 if (getQueueSize() >= mMaxInQueue) {
278 std::this_thread::sleep_for(5ms);
279 continue;
280 }
281 fileEntry = (fileEntry + 1) % getNFiles();
282 if (fileEntry == 0 && mNLoops > 0) {
283 LOG(info) << "Fetcher starts new iteration " << mNLoops;
284 }
285 mNFilesProc++;
286 auto& fileRef = mInputFiles[fileEntry];
287 if (fileRef.copied || !fileRef.remote || mNoRemoteCopy) {
288 mQueue.push(fileEntry);
289 mNFilesProcOK++;
290 } else { // need to copy
291 if (copyFile(fileEntry)) {
292 fileRef.copied = true;
293 mQueue.push(fileEntry);
294 mNFilesProcOK++;
295 } else {
296 if (mFailThreshold < 0.f) { // cut on abs number of failures
297 if (mNFilesProc - mNFilesProcOK > -mNFilesProcOK) {
298 mFailure = true;
299 }
300 } else if (mFailThreshold > 0.f) {
301 float fracFail = mNLoops ? (mNFilesProc - mNFilesProcOK) / float(mNFilesProc) : (mNFilesProc - mNFilesProcOK) / float(getNFiles());
302 mFailure = fracFail > mFailThreshold;
303 }
304 if (mFailure) {
305 mRunning = false;
306 break;
307 }
308 }
309 }
310 }
311}
312
313//____________________________________________________________
314void FileFetcher::discardFile(const std::string& fname)
315{
316 // delete file if it is copied.
317 auto ent = mCopied.find(fname);
318 if (ent != mCopied.end()) {
319 mInputFiles[ent->second - 1].copied = false;
320 fs::remove(fname);
321 mCopied.erase(fname);
322 }
323}
324
325//____________________________________________________________
326bool FileFetcher::copyFile(size_t id)
327{
328 // copy remote file to local setCopyDirName. Adaptation for Gvozden's code from SubTimeFrameFileSource::DataFetcherThread()
329 bool aliencpMode = false;
330 std::string uuid{};
331 std::vector<std::string> logsToClean;
332 std::string dbgset{};
333 if (mCopyCmd.find("alien") != std::string::npos) {
334 if (!gGrid && !TGrid::Connect("alien://")) {
335 LOG(error) << "Copy command refers to alien but connection to Grid failed";
336 }
337 uuid = mInputFiles[id].getOrigName();
338 for (auto& c : uuid) {
339 if (!std::isalnum(c) && c != '-') {
340 c = '_';
341 }
342 }
343 if (!(getenv("ALIENPY_DEBUG") && std::stoi(getenv("ALIENPY_DEBUG")) == 1)) {
344 logsToClean.push_back(fmt::format("log_alienpy_{}.txt", uuid));
345 dbgset += fmt::format("ALIENPY_DEBUG=1 ALIENPY_DEBUG_FILE={} ", logsToClean.back());
346 }
347 if (!(getenv("XRD_LOGLEVEL") && strcmp(getenv("XRD_LOGLEVEL"), "Dump") == 0)) {
348 logsToClean.push_back(fmt::format("log_xrd_{}.txt", uuid));
349 dbgset += fmt::format("XRD_LOGLEVEL=Dump XRD_LOGFILE={} ", logsToClean.back());
350 }
351 LOGP(debug, "debug setting for for {}: {}", mInputFiles[id].getOrigName(), dbgset);
352 }
353 auto realCmd = std::regex_replace(std::regex_replace(mCopyCmd, std::regex(R"(\?src)"), mInputFiles[id].getOrigName()), std::regex(R"(\?dst)"), mInputFiles[id].getLocalName());
354 auto fullCmd = fmt::format(R"(sh -c "{}{}" >> {} 2>&1)", dbgset, realCmd, mCopyCmdLogFile);
355 LOG(info) << "Executing " << fullCmd;
356 const auto sysRet = gSystem->Exec(fullCmd.c_str());
357 if (sysRet != 0) {
358 LOGP(warning, "FileFetcher: non-zero exit code {} for cmd={}", sysRet, realCmd);
359 std::string logCmd = fmt::format(R"(sh -c "cp {} log_aliencp_{}.txt")", mCopyCmdLogFile, uuid);
360 gSystem->Exec(logCmd.c_str());
361 } else { // on success cleanup debug log files
362 for (const auto& log : logsToClean) {
363 if (fs::exists(log)) {
364 fs::remove(log);
365 }
366 }
367 }
368 if (!fs::is_regular_file(mInputFiles[id].getLocalName()) || fs::is_empty(mInputFiles[id].getLocalName()) || sysRet != 0) {
369 LOGP(alarm, "FileFetcher: failed for copy command {}", realCmd);
370 return false;
371 }
372 mCopied[mInputFiles[id].getLocalName()] = id + 1;
373 return true;
374}
uint32_t c
Definition RawData.h:2
TBranch * ptr
std::ostringstream debug
const T * frontPtr() const
Definition FIFO.h:82
void pop()
Definition FIFO.h:56
const T & front() const
Definition FIFO.h:64
void push(Args &&... args)
Definition FIFO.h:50
bool empty() const
Definition FIFO.h:43
void discardFile(const std::string &fname)
size_t getQueueSize() const
Definition FileFetcher.h:77
size_t getNFiles() const
Definition FileFetcher.h:75
size_t popFromQueue(bool discard=false)
std::string getNextFileInQueue() const
FileFetcher(const std::string &input, const std::string &selRegex="", const std::string &remRegex="", const std::string &copyCmd="", const std::string &copyDir="/tmp")
GLuint entry
Definition glcorearb.h:5735
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint id
Definition glcorearb.h:650
void createDirectoriesIfAbsent(std::string const &path)
static void trim(std::string &s)
Definition StringUtils.h:71
static bool pathExists(const std::string_view p)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string create_unique_path(const std::string_view prefix="", int length=16)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"