30using namespace std::chrono_literals;
31namespace fs = std::filesystem;
35 const std::string& copyCmd,
const std::string& copyDir)
36 : mCopyCmd(copyCmd), mCopyDirName(copyDir)
38 if (!selRegex.empty()) {
39 mSelRegex = std::make_unique<std::regex>(selRegex.c_str());
41 if (!remRegex.empty()) {
42 mRemRegex = std::make_unique<std::regex>(remRegex);
44 mNoRemoteCopy = mCopyCmd ==
"no-copy";
49 LOGP(info,
"Input contains {} files, {} remote",
getNFiles(), mNRemote);
52 LOGP(info,
"... but their local copying is explicitly forbidden");
54 if (mCopyCmd.find(
"?src") == std::string::npos || mCopyCmd.find(
"?dst") == std::string::npos) {
55 throw std::runtime_error(fmt::format(
"remote files asked but copy cmd \"{}\" is not valid", mCopyCmd));
60 throw std::runtime_error(fmt::format(
"failed to create scratch directory {}", mCopyDirName));
62 mCopyCmdLogFile = fmt::format(
"{}/{}", mCopyDirName,
"copy-cmd.log");
63 LOGP(info,
"FileFetcher tmp scratch directory is set to {}", mCopyDirName);
76void FileFetcher::processInput(
const std::string& input)
79 processInput(ftokens);
83void FileFetcher::processInput(
const std::vector<std::string>& input)
85 for (
auto inp : input) {
87 if (fs::is_directory(inp)) {
88 processDirectory(inp);
89 }
else if (mSelRegex && !std::regex_match(inp, *mSelRegex.get())) {
92 std::filesystem::path p(inp);
93 if (std::filesystem::file_size(p) > 10000000) {
94 LOGP(error,
"file list {} larger than 10MB. Is this a data file?", inp);
98 std::ifstream listFile(inp);
99 if (!listFile.good()) {
100 LOGP(error,
"file {} pretends to be a list of inputs but does not exist", inp);
104 std::vector<std::string> newInput;
105 while (getline(listFile, line)) {
107 if (line[0] ==
'#' || line.empty()) {
110 newInput.push_back(line);
112 processInput(newInput);
120void FileFetcher::processDirectory(
const std::string&
name)
122 std::vector<std::string> vs;
123 for (
auto const&
entry : fs::directory_iterator{
name}) {
124 const auto& fnm =
entry.path().native();
125 if (fs::is_regular_file(fnm) && (!mSelRegex || std::regex_match(fnm, *mSelRegex.get()))) {
129 std::sort(vs.begin(), vs.end());
130 for (
const auto& s : vs) {
136bool FileFetcher::addInputFile(
const std::string& fname)
138 static bool alienErrorPrinted =
false;
139 if (mRemRegex && std::regex_match(fname, *mRemRegex.get())) {
140 mInputFiles.emplace_back(FileRef{fname, mNoRemoteCopy ? fname : createCopyName(fname),
true,
false});
141 if (fname.find(
"alien:") == 0) {
142 if (!gGrid && !TGrid::Connect(
"alien://") && !alienErrorPrinted) {
143 LOG(error) <<
"File name starts with alien but connection to Grid failed";
144 alienErrorPrinted =
true;
148 }
else if (fs::exists(fname)) {
149 mInputFiles.emplace_back(FileRef{fname,
"",
false,
false});
151 LOGP(error,
"file {} pretends to be local but does not exist", fname);
158std::string FileFetcher::createCopyName(
const std::string& fname)
const
160 std::string cpnam{}, cpnamP = fname;
161 for (
auto&
c : cpnamP) {
162 if (!std::isalnum(
c) &&
c !=
'.' &&
c !=
'-') {
168 if (!fs::exists(cpnam)) {
179 std::lock_guard<std::mutex> lock(mMtx);
181 if (mQueue.
empty()) {
184 auto id = mQueue.
front();
193size_t FileFetcher::nextInQueue()
const
195 return mQueue.
empty() ? -1ul : mQueue.
front();
201 if (mQueue.
empty()) {
204 return mQueue.
empty() ?
"" : mInputFiles[mQueue.
front()].getLocalName();
214 mFetcherThread = std::thread(&FileFetcher::fetcher,
this);
221 std::lock_guard<std::mutex> lock(mMtxStop);
222 if (mFetcherThread.joinable()) {
223 mFetcherThread.join();
226 LOGP(fatal,
"too many failures in file fetching: {} in {} attempts for {} files in {} loops, abort", mNFilesProc - mNFilesProcOK, mNFilesProc,
getNFiles(), mNLoops);
234 throw std::runtime_error(
"FileFetcher thread is still active, cannot cleanup");
238 fs::remove_all(mCopyDirName);
240 LOGP(error,
"FileFetcher failed to remove sctrach directory {}", mCopyDirName);
246void FileFetcher::fetcher()
249 size_t fileEntry = -1ul;
259 }
catch (
const std::exception& e) {
260 setenv(
"LC_ALL",
"C", 1);
263 LOG(info) <<
"Setting locale";
264 }
catch (
const std::exception& e) {
265 LOG(info) <<
"Setting locale failed: " << e.what();
272 if (mNLoops > mMaxLoops) {
273 LOGP(info,
"Finished file fetching: {} of {} files fetched successfully in {} iterations", mNFilesProcOK, mNFilesProc, mMaxLoops);
278 std::this_thread::sleep_for(5ms);
281 fileEntry = (fileEntry + 1) %
getNFiles();
282 if (fileEntry == 0 && mNLoops > 0) {
283 LOG(info) <<
"Fetcher starts new iteration " << mNLoops;
286 auto& fileRef = mInputFiles[fileEntry];
287 if (fileRef.copied || !fileRef.remote || mNoRemoteCopy) {
288 mQueue.
push(fileEntry);
291 if (copyFile(fileEntry)) {
292 fileRef.copied =
true;
293 mQueue.
push(fileEntry);
296 if (mFailThreshold < 0.f) {
297 if (mNFilesProc - mNFilesProcOK > -mNFilesProcOK) {
300 }
else if (mFailThreshold > 0.f) {
301 float fracFail = mNLoops ? (mNFilesProc - mNFilesProcOK) /
float(mNFilesProc) : (mNFilesProc - mNFilesProcOK) /
float(
getNFiles());
302 mFailure = fracFail > mFailThreshold;
317 auto ent = mCopied.find(fname);
318 if (ent != mCopied.end()) {
319 mInputFiles[ent->second - 1].copied =
false;
321 mCopied.erase(fname);
326bool FileFetcher::copyFile(
size_t id)
329 bool aliencpMode =
false;
331 std::vector<std::string> logsToClean;
332 std::string dbgset{};
333 if (mCopyCmd.find(
"alien") != std::string::npos) {
334 if (!gGrid && !TGrid::Connect(
"alien://")) {
335 LOG(error) <<
"Copy command refers to alien but connection to Grid failed";
337 uuid = mInputFiles[
id].getOrigName();
338 for (
auto&
c : uuid) {
339 if (!std::isalnum(
c) &&
c !=
'-') {
343 if (!(getenv(
"ALIENPY_DEBUG") && std::stoi(getenv(
"ALIENPY_DEBUG")) == 1)) {
344 logsToClean.push_back(fmt::format(
"log_alienpy_{}.txt", uuid));
345 dbgset += fmt::format(
"ALIENPY_DEBUG=1 ALIENPY_DEBUG_FILE={} ", logsToClean.back());
347 if (!(getenv(
"XRD_LOGLEVEL") && strcmp(getenv(
"XRD_LOGLEVEL"),
"Dump") == 0)) {
348 logsToClean.push_back(fmt::format(
"log_xrd_{}.txt", uuid));
349 dbgset += fmt::format(
"XRD_LOGLEVEL=Dump XRD_LOGFILE={} ", logsToClean.back());
351 LOGP(
debug,
"debug setting for for {}: {}", mInputFiles[
id].getOrigName(), dbgset);
353 auto realCmd = std::regex_replace(std::regex_replace(mCopyCmd, std::regex(R
"(\?src)"), mInputFiles[id].getOrigName()), std::regex(R
"(\?dst)"), mInputFiles[id].getLocalName());
354 auto fullCmd = fmt::format(R
"(sh -c "{}{}" >> {} 2>&1)", dbgset, realCmd, mCopyCmdLogFile);
355 LOG(info) << "Executing " << fullCmd;
356 const auto sysRet = gSystem->Exec(fullCmd.c_str());
358 LOGP(warning,
"FileFetcher: non-zero exit code {} for cmd={}", sysRet, realCmd);
359 std::string logCmd = fmt::format(R
"(sh -c "cp {} log_aliencp_{}.txt")", mCopyCmdLogFile, uuid);
360 gSystem->Exec(logCmd.c_str());
362 for (
const auto& log : logsToClean) {
363 if (fs::exists(log)) {
368 if (!fs::is_regular_file(mInputFiles[
id].getLocalName()) || fs::is_empty(mInputFiles[
id].getLocalName()) || sysRet != 0) {
369 LOGP(alarm,
"FileFetcher: failed for copy command {}", realCmd);
372 mCopied[mInputFiles[
id].getLocalName()] =
id + 1;
const T * frontPtr() const
void push(Args &&... args)
void discardFile(const std::string &fname)
size_t getQueueSize() const
size_t popFromQueue(bool discard=false)
std::string getNextFileInQueue() const
FileFetcher(const std::string &input, const std::string &selRegex="", const std::string &remRegex="", const std::string ©Cmd="", const std::string ©Dir="/tmp")
GLuint const GLchar * name
void createDirectoriesIfAbsent(std::string const &path)
static void trim(std::string &s)
static bool pathExists(const std::string_view p)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string create_unique_path(const std::string_view prefix="", int length=16)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"