Project
Loading...
Searching...
No Matches
EPNstderrMonitor.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#include <fairmq/Device.h>
16#include <fairmq/runDevice.h>
17
18#include "InfoLogger/InfoLogger.hxx"
19
20#include <string>
21#include <thread>
22#include <vector>
23#include <unordered_map>
24#include <regex>
25#include <filesystem>
26#include <chrono>
27#include <fstream>
28
29#include <unistd.h>
30#include <sys/inotify.h>
31#include <poll.h>
32
33using namespace AliceO2;
34
35static constexpr size_t MAX_LINES_FILE = 30;
36static constexpr size_t MAX_BYTES_FILE = MAX_LINES_FILE * 512;
37static constexpr size_t MAX_LINES_TOTAL = 1000;
38static constexpr size_t MAX_BYTES_TOTAL = MAX_LINES_TOTAL * 256;
39
40struct fileMon {
41 std::ifstream file;
42 std::string name;
43 unsigned int nLines = 0;
44 unsigned int nBytes = 0;
45 bool stopped = false;
46
47 fileMon(const std::string& path, const std::string& filename);
48 fileMon(const std::string& filename, std::ifstream&& f);
49};
50
51fileMon::fileMon(const std::string& path, const std::string& filename) : name(filename)
52{
53 printf("Monitoring file %s\n", filename.c_str());
54 file.open(path + "/" + filename, std::ifstream::in);
55}
56
57fileMon::fileMon(const std::string& filename, std::ifstream&& f) : file(std::move(f)), name(filename)
58{
59}
60
62{
63 public:
64 EPNMonitor(std::string path, bool infoLogger, int runNumber, std::string partition);
66 void setRunNr(int nr) { mRunNumber = nr; }
67
68 private:
69 void thread();
70 void check_add_file(const std::string& filename);
71 void sendLog(const std::string& file, const std::string& message,
72 const InfoLogger::InfoLogger::Severity severity = InfoLogger::InfoLogger::Severity::Error, int level = 3);
73
74 bool mInfoLoggerActive;
75 volatile bool mTerminate = false;
76 std::thread mThread;
77 std::unordered_map<std::string, fileMon> mFiles;
78 std::string mPath;
79 std::vector<std::regex> mFilters;
80 std::unordered_map<std::string, std::pair<InfoLogger::InfoLogger::Severity, int>> mMapLogTypes;
81 volatile unsigned int mRunNumber;
82 std::string mPartition;
83 unsigned int nLines = 0;
84 unsigned int nBytes = 0;
85 std::unique_ptr<InfoLogger::InfoLogger> mLogger;
86 std::unique_ptr<InfoLogger::InfoLoggerContext> mLoggerContext;
87};
88
89EPNMonitor::EPNMonitor(std::string path, bool infoLogger, int runNumber, std::string partition)
90{
91 mFilters.emplace_back("^Info in <");
92 mFilters.emplace_back("^Print in <");
93 mFilters.emplace_back("^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}");
94 mFilters.emplace_back("^Warning in <Fit");
95 mFilters.emplace_back("^Warning in <TGraph");
96 mFilters.emplace_back("^Warning in <TInterpreter");
97 mFilters.emplace_back("Dividing histograms with different labels");
98 mMapLogTypes.emplace("(core dumped)", std::pair<InfoLogger::InfoLogger::Severity, int>{InfoLogger::InfoLogger::Severity::Error, 1});
99 mMapLogTypes.emplace("Warning in <", std::pair<InfoLogger::InfoLogger::Severity, int>{InfoLogger::InfoLogger::Severity::Warning, 11});
100 mMapLogTypes.emplace("Error in <", std::pair<InfoLogger::InfoLogger::Severity, int>{InfoLogger::InfoLogger::Severity::Error, 2});
101 mMapLogTypes.emplace("Fatal in <", std::pair<InfoLogger::InfoLogger::Severity, int>{InfoLogger::InfoLogger::Severity::Fatal, 1});
102 mMapLogTypes.emplace("*** Break ***", std::pair<InfoLogger::InfoLogger::Severity, int>{InfoLogger::InfoLogger::Severity::Fatal, 1});
103 mInfoLoggerActive = infoLogger;
104 mPath = path;
105 mRunNumber = runNumber;
106 mPartition = partition;
107 if (infoLogger) {
108 mLogger = std::make_unique<InfoLogger::InfoLogger>();
109 mLoggerContext = std::make_unique<InfoLogger::InfoLoggerContext>();
110 mLoggerContext->setField(InfoLogger::InfoLoggerContext::FieldName::Partition, partition != "" ? partition : "unspecified");
111 mLoggerContext->setField(InfoLogger::InfoLoggerContext::FieldName::System, std::string("STDERR"));
112 }
113 mThread = std::thread(&EPNMonitor::thread, this);
114}
115
117{
118 mTerminate = true;
119 mThread.join();
120}
121
122void EPNMonitor::check_add_file(const std::string& filename)
123{
124 //printf("Checking '%s'\n", filename.c_str());
125 static const std::regex match_stderr("_err\\.log$");
126 if (std::regex_search(filename, match_stderr)) {
127 mFiles.try_emplace(filename, mPath, filename);
128 }
129}
130
131void EPNMonitor::sendLog(const std::string& file, const std::string& message, const InfoLogger::InfoLogger::Severity severity, int level)
132{
133 if (mInfoLoggerActive) {
134 mLoggerContext->setField(InfoLogger::InfoLoggerContext::FieldName::Facility, ("stderr/" + file).substr(0, 31));
135 mLoggerContext->setField(InfoLogger::InfoLoggerContext::FieldName::Run, mRunNumber != 0 ? std::to_string(mRunNumber) : "unspecified");
136 static const InfoLogger::InfoLogger::InfoLoggerMessageOption opt = {severity, level, InfoLogger::InfoLogger::undefinedMessageOption.errorCode, InfoLogger::InfoLogger::undefinedMessageOption.sourceFile, InfoLogger::InfoLogger::undefinedMessageOption.sourceLine};
137 mLogger->log(opt, *mLoggerContext, "stderr: %s", file == "SYSLOG" ? (std::string("[GLOBAL SYSLOG]: ") + message).c_str() : message.c_str());
138 } else {
139 printf("stderr: [%c] %s: %s\n", severity, file.c_str(), message.c_str());
140 }
141}
142
143void EPNMonitor::thread()
144{
145 printf("EPN stderr Monitor active\n");
146
147 try {
148 std::string syslogfile = "/var/log/infologger_syslog";
149 std::ifstream file;
150 file.open(syslogfile, std::ifstream::in);
151 file.seekg(0, file.end);
152 mFiles.emplace(std::piecewise_construct, std::forward_as_tuple(syslogfile), std::forward_as_tuple(std::string("SYSLOG"), std::move(file)));
153 } catch (...) {
154 }
155
156 int fd;
157 int wd;
158 static constexpr size_t BUFFER_SIZE = 64 * 1024;
159 std::vector<char> evt_buffer(BUFFER_SIZE);
160 std::vector<char> text_buffer(8192);
161 fd = inotify_init();
162 wd = inotify_add_watch(fd, mPath.c_str(), IN_CREATE);
163 if (fd < 0) {
164 throw std::runtime_error(std::string("Error initializing inotify ") + std::to_string(fd) + " " + std::to_string(wd));
165 }
166 pollfd pfd = {fd, POLLIN, 0};
167
168 for (const auto& entry : std::filesystem::directory_iterator(mPath)) {
169 if (entry.is_regular_file()) {
170 check_add_file(entry.path().filename());
171 }
172 }
173
174 auto lastTime = std::chrono::system_clock::now();
175 while (!mTerminate) {
176 if (poll(&pfd, 1, 50) > 0) {
177 int l = read(fd, evt_buffer.data(), BUFFER_SIZE);
178 if (l < 0) {
179 throw std::runtime_error(std::string("Error waiting for inotify event ") + std::to_string(l));
180 }
181 for (int i = 0; i < l; i += sizeof(inotify_event)) {
182 inotify_event* event = (inotify_event*)&evt_buffer[i];
183 if (event->len && (event->mask & IN_CREATE) && !(event->mask & IN_ISDIR)) {
184 check_add_file(event->name);
185 }
186 i += event->len;
187 }
188 }
189 auto curTime = std::chrono::system_clock::now();
190 if (std::chrono::duration_cast<std::chrono::milliseconds>(curTime - lastTime).count() >= 1000) {
191 char* ptr = text_buffer.data();
192 std::string line;
193 for (auto fit = mFiles.begin(); fit != mFiles.end(); fit++) {
194 auto& f = fit->second;
195 if (f.stopped) {
196 continue;
197 }
198 auto& file = f.file;
199 file.clear();
200 do {
201 std::getline(file, line);
202 if (line.size()) {
203 bool filterLine = false;
204 for (const auto& filter : mFilters) {
205 if (std::regex_search(line, filter)) {
206 filterLine = true;
207 break;
208 }
209 }
210 if (filterLine) {
211 continue;
212 }
213 // assign proper severity / level for remaining ROOT log messages
214 auto severity{InfoLogger::InfoLogger::Severity::Error};
215 int level{3};
216 for (const auto& logType : mMapLogTypes) {
217 if (line.find(logType.first) != std::string::npos) {
218 severity = std::get<InfoLogger::InfoLogger::Severity>(logType.second);
219 level = std::get<int>(logType.second);
220 break;
221 }
222 }
223 f.nLines++;
224 f.nBytes += line.size();
225 nLines++;
226 nBytes += line.size();
227 if (f.nLines >= MAX_LINES_FILE || f.nBytes >= MAX_BYTES_FILE) {
228 sendLog(f.name, "Exceeded log size for process " + f.name + " (" + std::to_string(f.nLines) + " lines, " + std::to_string(f.nBytes) + " bytes), not reporting any more errors from this file...");
229 f.stopped = true;
230 break;
231 }
232 if (nLines >= MAX_LINES_TOTAL || nBytes >= MAX_BYTES_TOTAL) {
233 break;
234 }
235 sendLog(f.name, line, severity, level);
236 }
237 } while (!file.eof());
238 }
239 lastTime = curTime;
240 }
241 if (nLines >= MAX_LINES_TOTAL || nBytes >= MAX_BYTES_TOTAL) {
242 sendLog("", "Max total stderr log size exceeded (" + std::to_string(nLines) + " lines, " + std::to_string(nBytes) + "), not sending any more stderr logs from this node...");
243 break;
244 }
245
246 usleep(50000);
247 }
248
249 inotify_rm_watch(fd, wd);
250 close(fd);
251
252 printf("EPN stderr Monitor terminating\n");
253}
254
255static std::unique_ptr<EPNMonitor> gEPNMonitor;
256
257namespace bpo = boost::program_options;
258
260 void InitTask() override
261 {
262 std::string path = getenv("DDS_LOCATION") ? (std::string(getenv("DDS_LOCATION")) + "/") : std::string(".");
263 bool infoLogger = fConfig->GetProperty<int>("infologger");
264 bool dds = false;
265
266 std::string partition = "";
267 try {
268 partition = fConfig->GetProperty<std::string>("environment_id", "");
269 printf("Got environment_id: %s\n", partition.c_str());
270 } catch (...) {
271 printf("Error getting environment_id\n");
272 }
273
274 gEPNMonitor = std::make_unique<EPNMonitor>(path, infoLogger, 0, partition);
275 }
276 void PreRun() override
277 {
278 int runNumber = 0;
279 try {
280 runNumber = atoi(fConfig->GetProperty<std::string>("runNumber", "").c_str());
281 printf("Got runNumber: %d\n", runNumber);
282 } catch (...) {
283 printf("Error getting runNumber\n");
284 }
285 gEPNMonitor->setRunNr(runNumber);
286 }
287 bool ConditionalRun() override
288 {
289 usleep(100000);
290 return true;
291 }
292};
293
294void addCustomOptions(bpo::options_description& options)
295{
296 options.add_options()("infologger", bpo::value<int>()->default_value(0), "Send via infologger");
297}
298
299std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& config)
300{
301 return std::make_unique<EPNstderrMonitor>();
302}
void addCustomOptions(bpo::options_description &options)
int32_t i
o2::devices::O2SimDevice * getDevice()
TBranch * ptr
EPNMonitor(std::string path, bool infoLogger, int runNumber, std::string partition)
void setRunNr(int nr)
struct _cl_event * event
Definition glcorearb.h:2982
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLint level
Definition glcorearb.h:275
GLenum GLenum severity
Definition glcorearb.h:2513
DeliveryType read(const std::string &str)
int32_t const char int32_t line
TFitResultPtr fit(const size_t nBins, const T *arr, const T xMin, const T xMax, TF1 &func, std::string_view option="")
Definition fit.h:59
Polygon< T > close(Polygon< T > polygon)
Definition Polygon.h:126
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
bool ConditionalRun() override
void InitTask() override
void PreRun() override
unsigned int nLines
std::ifstream file
std::string name
fileMon(const std::string &path, const std::string &filename)
unsigned int nBytes