Project
Loading...
Searching...
No Matches
RunManager.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
16#include <iostream>
17#include <sstream>
18#include <regex>
20#include <fairlogger/Logger.h>
21
22using namespace o2::ctp;
28{
29 std::vector<int> clslist = cfg.getTriggerClassList();
30 for (auto const& cls : clslist) {
31 cntslast[cls] = {0, 0, 0, 0, 0, 0};
32 cntslast0[cls] = {0, 0, 0, 0, 0, 0};
33 overflows[cls] = {0, 0, 0, 0, 0, 0};
34 }
35}
36int CTPActiveRun::send2BK(std::unique_ptr<BkpClient>& BKClient, size_t ts, bool start)
37{
38 int runNumber = cfg.getRunNumber();
39 // LOG(info) << "BK Filling run:" << runNumber;
40 // int runOri = runNumber;
41 // runNumber = 123;
42 if (start) {
43 for (auto const& cls : cntslast) {
44 for (int i = 0; i < 6; i++) {
45 cnts0[cls.first][i] = cls.second[i];
46 }
47 }
48 }
49 std::array<uint64_t, 6> cntsbk{0};
50 for (auto const& cls : cntslast) {
51 for (int i = 0; i < 6; i++) {
52 if (cls.second[i] < cntslast0[cls.first][i]) {
53 overflows[cls.first][i]++;
54 }
55 cntslast0[cls.first][i] = cls.second[i];
56 cntsbk[i] = (uint64_t)cls.second[i] + 0xffffffffull * overflows[cls.first][i] - (uint64_t)cnts0[cls.first][i];
57 }
58 std::string clsname = cfg.getClassNameFromHWIndex(cls.first);
59 // clsname = std::to_string(runOri) + "_" + clsname;
60 try {
61 BKClient->ctpTriggerCounters()->createOrUpdateForRun(runNumber, clsname, ts, cntsbk[0], cntsbk[1], cntsbk[2], cntsbk[3], cntsbk[4], cntsbk[5]);
62 } catch (std::runtime_error& error) {
63 std::cerr << "An error occurred: " << error.what() << std::endl;
64 return 1;
65 }
66 LOG(debug) << "Run BK:" << runNumber << " class:" << clsname << " cls.first" << cls.first << " ts:" << ts << " cnts:" << cntsbk[0] << " " << cntsbk[1] << " " << cntsbk[2] << " " << cntsbk[3] << " " << cntsbk[4] << " " << cntsbk[5];
67 }
68 return 0;
69}
74{
75 for (uint32_t i = 0; i < NRUNS; i++) {
76 mActiveRuns[i] = nullptr;
77 }
79 if (mBKHost != "none") {
80 mBKClient = BkpClientFactory::create(mBKHost);
81 LOG(info) << "BK Client created with:" << mBKHost;
82 } else {
83 LOG(info) << "BK not sent";
84 }
86 LOG(info) << "QC host:" << mQCDBHost;
87 LOG(info) << "QCDB writing every:" << mQCWritePeriod << " 10 secs";
88 LOG(info) << "CCDB host:" << mCCDBHost;
89 LOG(info) << "CTP vNew cfg:" << mNew;
90 LOG(info) << "ctp.cfg dir:" << mCtpCfgDir;
91 LOG(info) << "CTPRunManager initialised.";
92}
93int CTPRunManager::loadRun(const std::string& cfg)
94{
95 LOG(info) << "Loading run: " << cfg;
96 auto now = std::chrono::system_clock::now();
97 long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
98 LOG(info) << "Timestamp real time:" << timeStamp;
99 size_t pos = cfg.find(" ");
100 std::string cfgmod = cfg;
101 if (pos == std::string::npos) {
102 LOG(warning) << "Space not found in CTP config";
103 } else {
104 std::string f = cfg.substr(0, pos);
105 if (f.find("run") == std::string::npos) {
106 double_t tt = std::stold(f);
107 timeStamp = (tt * 1000.);
108 LOG(info) << "Timestamp file:" << timeStamp;
109 cfgmod = cfg.substr(pos, cfg.size());
110 LOG(info) << "ctpconfig: using ctp time";
111 }
112 }
113 CTPActiveRun* activerun = new CTPActiveRun;
114 activerun->timeStart = timeStamp;
115 activerun->cfg.loadConfigurationRun3(cfgmod);
116 activerun->cfg.printStream(std::cout);
117 //
118 uint32_t runnumber = activerun->cfg.getRunNumber();
119 activerun->scalers.setRunNumber(runnumber);
120 activerun->scalers.setClassMask(activerun->cfg.getTriggerClassMask());
121 o2::detectors::DetID::mask_t detmask = activerun->cfg.getDetectorMask();
122 activerun->scalers.setDetectorMask(detmask);
123 //
124 mRunsLoaded[runnumber] = activerun;
125 saveRunConfigToCCDB(&activerun->cfg, timeStamp);
126 if (mCtpCfgDir != "none") {
127 saveCtpCfg(runnumber, timeStamp);
128 }
129 return 0;
130}
131int CTPRunManager::setRunConfigBK(uint32_t runNumber, const std::string& cfg)
132{
133 std::cout << "Printing run:" << runNumber << " cfg:" << cfg << std::endl;
134 if (mBKClient) {
135 try {
136 mBKClient->run()->setRawCtpTriggerConfiguration(runNumber, cfg);
137 } catch (std::runtime_error& error) {
138 std::cerr << "An error occurred: " << error.what() << std::endl;
139 return 1;
140 }
141 LOG(info) << "Run BK:" << runNumber << " CFG:" << cfg;
142 }
143 return 0;
144}
145int CTPRunManager::stopRun(uint32_t irun, long timeStamp)
146{
147 LOG(info) << "Stopping run index: " << irun;
148 if (mActiveRuns[irun] == nullptr) {
149 LOG(error) << "No config for run index:" << irun;
150 return 1;
151 }
152 // const auto now = std::chrono::system_clock::now();
153 // const long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
154 mActiveRuns[irun]->timeStop = timeStamp * 1000.;
155 saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
156 saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
157 delete mActiveRuns[irun];
158 mActiveRuns[irun] = nullptr;
159 return 0;
160}
161int CTPRunManager::addScalers(uint32_t irun, std::time_t time, bool start)
162{
163 if (mActiveRuns[irun] == nullptr) {
164 LOG(error) << "No config for run index:" << irun;
165 return 1;
166 }
167 std::string orb = "extorb";
168 std::string orbitid = "orbitid";
169 CTPScalerRecordRaw scalrec;
170 scalrec.epochTime = time;
171 std::vector<int> clslist = mActiveRuns[irun]->cfg.getTriggerClassList();
172 for (auto const& cls : clslist) {
173 std::string cmb = "clamb" + std::to_string(cls + 1);
174 std::string cma = "clama" + std::to_string(cls + 1);
175 std::string c0b = "cla0b" + std::to_string(cls + 1);
176 std::string c0a = "cla0a" + std::to_string(cls + 1);
177 std::string c1b = "cla1b" + std::to_string(cls + 1);
178 std::string c1a = "cla1a" + std::to_string(cls + 1);
179 CTPScalerRaw scalraw;
180 scalraw.classIndex = (uint32_t)cls;
181 // std::cout << "cls:" << cls << " " << scalraw.classIndex << std::endl;
182 scalraw.lmBefore = mCounters[mScalerName2Position[cmb]];
183 scalraw.lmAfter = mCounters[mScalerName2Position[cma]];
184 scalraw.l0Before = mCounters[mScalerName2Position[c0b]];
185 scalraw.l0After = mCounters[mScalerName2Position[c0a]];
186 scalraw.l1Before = mCounters[mScalerName2Position[c1b]];
187 scalraw.l1After = mCounters[mScalerName2Position[c1a]];
188 // std::cout << "positions:" << cmb << " " << mScalerName2Position[cmb] << std::endl;
189 // std::cout << "positions:" << cma << " " << mScalerName2Position[cma] << std::endl;
190 scalrec.scalers.push_back(scalraw);
191 // BK scalers to be corrected for overflow
192 if (mBKClient) {
193 CTPActiveRun* ar = mActiveRuns[irun];
194 ar->cntslast[cls][0] = scalraw.lmBefore;
195 ar->cntslast[cls][1] = scalraw.lmAfter;
196 ar->cntslast[cls][2] = scalraw.l0Before;
197 ar->cntslast[cls][3] = scalraw.l0After;
198 ar->cntslast[cls][4] = scalraw.l1Before;
199 ar->cntslast[cls][5] = scalraw.l1After;
200 }
201 }
202 mActiveRuns[irun]->send2BK(mBKClient, time, start);
203 //
204 uint32_t NINPS = 48;
205 int offset = 599;
206 for (uint32_t i = 0; i < NINPS; i++) {
207 uint32_t inpcount = mCounters[offset + i];
208 scalrec.scalersInps.push_back(inpcount);
209 // LOG(info) << "Scaler for input:" << CTPRunScalers::scalerNames[offset+i] << ":" << inpcount;
210 }
211 //
212 if (mNew == 0) {
213 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orb]];
214 } else {
215 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orbitid]];
216 }
217 scalrec.intRecord.bc = 0;
218 mActiveRuns[irun]->scalers.addScalerRacordRaw(scalrec);
219 LOG(info) << "Adding scalers for orbit:" << scalrec.intRecord.orbit;
220 // scalrec.printStream(std::cout);
221 // printCounters();
222 return 0;
223}
224int CTPRunManager::processMessage(std::string& topic, const std::string& message)
225{
226 LOG(info) << "Processing message with topic:" << topic;
227 std::string firstcounters;
228 if (topic.find("clear") != std::string::npos) {
229 mRunsLoaded.clear();
230 LOG(info) << "Loaded runs cleared.";
231 return 0;
232 }
233 if (topic.find("ctpconfig") != std::string::npos) {
234 LOG(info) << "ctpconfig received";
236 return 0;
237 }
238 if (topic.find("soxorbit") != std::string::npos) {
239 std::vector<std::string> tokens = o2::utils::Str::tokenize(message, ' ');
240 int ret = 0;
241 if (tokens.size() == 3) {
242 long timestamp = std::stol(tokens[0]);
243 uint32_t runnumber = std::stoul(tokens[1]);
244 uint32_t orbit = std::stoul(tokens[2]);
245 ret = saveSoxOrbit(runnumber, orbit, timestamp);
246 std::string logmessage;
247 if (ret) {
248 logmessage = "Failed to update CCDB with SOX orbit.";
249 } else {
250 logmessage = "CCDB updated with SOX orbit.";
251 }
252 LOG(important) << logmessage << " run:" << runnumber << " sox orbit:" << orbit << " ts:" << timestamp;
253 } else {
254 LOG(error) << "Topic soxorbit dize !=3: " << message << " token size:" << tokens.size();
255 ret = 1;
256 }
257 return ret;
258 }
259 if (topic.find("orbitreset") != std::string::npos) {
260 std::vector<std::string> tokens = o2::utils::Str::tokenize(message, ' ');
261 int ret = 0;
262 if (tokens.size() == 1) {
263 long timestamp = std::stol(tokens[0]);
264 ret = saveOrbitReset(timestamp);
265 std::string logmessage;
266 if (ret) {
267 logmessage = "Failed to update CCDB with orbitreset. ";
268 } else {
269 logmessage = "CCDB updated with orbitreset. ";
270 }
271 LOG(important) << logmessage << timestamp;
272 } else {
273 LOG(error) << "Topic orbit reset != 2: " << message << " token size:" << tokens.size();
274 ret = 1;
275 }
276 return ret;
277 }
278 if (topic.find("rocnts") != std::string::npos) {
279 return 0;
280 }
281 static int nerror = 0;
282 if (topic.find("sox") != std::string::npos) {
283 // get config
284 size_t irun = message.find("run");
285 if (irun == std::string::npos) {
286 LOG(warning) << "run keyword not found in SOX";
287 irun = message.size();
288 }
289 LOG(info) << "SOX received, Run keyword position:" << irun;
290 std::string cfg = message.substr(irun, message.size() - irun);
291 firstcounters = message.substr(0, irun);
292 } else if (topic.find("eox") != std::string::npos) {
293 LOG(info) << "EOX received";
294 mEOX = 1;
295 } else if (topic.find("cnts") != std::string::npos) {
296 // just continue
297 } else {
298 if (nerror < 10) {
299 LOG(warning) << "Skipping topic:" << topic;
300 nerror++;
301 }
302 return 0;
303 }
304 //
305 std::vector<std::string> tokens;
306 if (firstcounters.size() > 0) {
307 tokens = o2::utils::Str::tokenize(firstcounters, ' ');
308 } else {
309 tokens = o2::utils::Str::tokenize(message, ' ');
310 }
311 if (tokens.size() != (CTPRunScalers::NCOUNTERS + 1)) {
312 if (tokens.size() == (CTPRunScalers::NCOUNTERSv2 + 1)) {
313 mNew = 0;
314 LOG(warning) << "v2 scaler size";
315 } else {
316 LOG(warning) << "Scalers size wrong:" << tokens.size() << " expected:" << CTPRunScalers::NCOUNTERS + 1 << " or " << CTPRunScalers::NCOUNTERSv2 + 1;
317 return 1;
318 }
319 }
320 double timeStamp = std::stold(tokens.at(0));
321 std::time_t tt = timeStamp;
322 LOG(info) << "Processing scalers, all good, time:" << tokens.at(0) << " " << std::asctime(std::localtime(&tt));
323 for (uint32_t i = 1; i < tokens.size(); i++) {
324 mCounters[i - 1] = std::stoull(tokens.at(i));
325 if (i < (NRUNS + 1)) {
326 std::cout << mCounters[i - 1] << " ";
327 }
328 }
329 std::cout << std::endl;
330 LOG(info) << "Counter size:" << tokens.size();
331 //
332 for (uint32_t i = 0; i < NRUNS; i++) {
333 if ((mCounters[i] == 0) && (mActiveRunNumbers[i] == 0)) {
334 // not active
335 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == mCounters[i])) {
336 // active , do scalers
337 LOG(info) << "Run continue:" << mCounters[i];
338 addScalers(i, tt);
339 // LOG(info) << " QC period:" << mActiveRunNumbers[i] << " " << mActiveRuns[i]->qcwpcount << " " << mQCWritePeriod;
340 if (mActiveRuns[i]->qcwpcount > mQCWritePeriod) {
341 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
342 mActiveRuns[i]->qcwpcount = 0;
343 }
344 mActiveRuns[i]->qcwpcount++;
345 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == 0)) {
346 LOG(info) << "Run started:" << mCounters[i];
347 auto run = mRunsLoaded.find(mCounters[i]);
348 if (run != mRunsLoaded.end()) {
349 mActiveRunNumbers[i] = mCounters[i];
350 mActiveRuns[i] = run->second;
351 mRunsLoaded.erase(run);
352 setRunConfigBK(mActiveRuns[i]->cfg.getRunNumber(), mActiveRuns[i]->cfg.getConfigString());
353 addScalers(i, tt, 1);
354 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
355 } else {
356 LOG(error) << "Trying to start run which is not loaded:" << mCounters[i];
357 }
358 } else if ((mCounters[i] == 0) && (mActiveRunNumbers[i] != 0)) {
359 if (mEOX != 1) {
360 LOG(error) << "Internal error in processMessage: mEOX != 1 expected 0: mEOX:" << mEOX;
361 }
362 LOG(info) << "Run stopped:" << mActiveRunNumbers[i];
363 addScalers(i, tt);
364 mActiveRunNumbers[i] = 0;
365 mEOX = 0;
366 stopRun(i, tt);
367 }
368 }
369 mEOX = 0;
371 return 0;
372}
374{
375 std::cout << "Active runs:";
376 for (auto const& arun : mActiveRunNumbers) {
377 std::cout << arun << " ";
378 }
379 std::cout << " #loaded runs:" << mRunsLoaded.size();
380 for (auto const& lrun : mRunsLoaded) {
381 std::cout << " " << lrun.second->cfg.getRunNumber();
382 }
383 std::cout << std::endl;
384}
386{
388 LOG(fatal) << "NCOUNTERS:" << CTPRunScalers::NCOUNTERS << " different from names vector:" << CTPRunScalers::scalerNames.size();
389 return 1;
390 }
391 // try to open files of no success use default
392 for (uint32_t i = 0; i < CTPRunScalers::scalerNames.size(); i++) {
393 mScalerName2Position[CTPRunScalers::scalerNames[i]] = i;
394 }
395 return 0;
396}
398{
399 int n = 0;
400 for (int i = 0; i < NRUNS; i++) {
401 if (mActiveRuns[i] != nullptr) {
402 n++;
403 }
404 }
405 return n;
406}
408{
409 int NDET = 18;
410 int NINPS = 48;
411 // int NCLKFP = 7;
412 int NLTG_start = NRUNS;
413 int NCLKFP_start = NLTG_start + NDET * 32;
414 int NINPS_start = NCLKFP_start + 7;
415 int NCLS_start = NINPS_start + NINPS;
416 std::cout << "====> CTP counters:" << std::endl;
417 std::cout << "RUNS:" << std::endl;
418 int ipos = 0;
419 for (uint32_t i = 0; i < NRUNS; i++) {
420 std::cout << ipos << ":" << mCounters[i] << " ";
421 ipos++;
422 }
423 std::cout << std::endl;
424 for (int i = 0; i < NDET; i++) {
425 std::cout << "LTG" << i + 1 << std::endl;
426 for (int j = NLTG_start + i * 32; j < NLTG_start + (i + 1) * 32; j++) {
427 std::cout << ipos << ":" << mCounters[j] << " ";
428 ipos++;
429 }
430 std::cout << std::endl;
431 }
432 std::cout << "BC40,BC240,Orbit,pulser, fastlm, busy,spare" << std::endl;
433 for (int i = NCLKFP_start; i < NCLKFP_start + 7; i++) {
434 std::cout << ipos << ":" << mCounters[i] << " ";
435 ipos++;
436 }
437 std::cout << std::endl;
438 std::cout << "INPUTS:" << std::endl;
439 for (int i = NINPS_start; i < NINPS_start + NINPS; i++) {
440 std::cout << ipos << ":" << mCounters[i] << " ";
441 ipos++;
442 }
443 std::cout << std::endl;
444 std::cout << "CLASS M Before" << std::endl;
445 for (int i = NCLS_start; i < NCLS_start + 64; i++) {
446 std::cout << ipos << ":" << mCounters[i] << " ";
447 ipos++;
448 }
449 std::cout << std::endl;
450 std::cout << "CLASS M After" << std::endl;
451 for (int i = NCLS_start + 64; i < NCLS_start + 2 * 64; i++) {
452 std::cout << ipos << ":" << mCounters[i] << " ";
453 ipos++;
454 }
455 std::cout << std::endl;
456 std::cout << "CLASS 0 Before" << std::endl;
457 for (int i = NCLS_start + 2 * 64; i < NCLS_start + 3 * 64; i++) {
458 std::cout << ipos << ":" << mCounters[i] << " ";
459 ipos++;
460 }
461 std::cout << std::endl;
462 std::cout << "CLASS 0 After" << std::endl;
463 for (int i = NCLS_start + 3 * 64; i < NCLS_start + 4 * 64; i++) {
464 std::cout << ipos << ":" << mCounters[i] << " ";
465 ipos++;
466 }
467 std::cout << std::endl;
468 std::cout << "CLASS 1 Before" << std::endl;
469 for (int i = NCLS_start + 4 * 64; i < NCLS_start + 5 * 64; i++) {
470 std::cout << ipos << ":" << mCounters[i] << " ";
471 ipos++;
472 }
473 std::cout << std::endl;
474 std::cout << "CLASS 1 After" << std::endl;
475 for (int i = NCLS_start + 5 * 64; i < NCLS_start + 6 * 64; i++) {
476 std::cout << ipos << ":" << mCounters[i] << " ";
477 ipos++;
478 }
479 std::cout << std::endl;
480 std::cout << " REST:" << std::endl;
481 for (uint32_t i = NCLS_start + 6 * 64; i < mCounters.size(); i++) {
482 if ((ipos % 10) == 0) {
483 std::cout << std::endl;
484 std::cout << ipos << ":";
485 }
486 std::cout << mCounters[i] << " ";
487 }
488}
uint64_t orbit
Definition RawEventData.h:6
int16_t time
Definition RawEventData.h:4
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t j
Definition RawData.h:0
Managing runs for config and scalers.
std::ostringstream debug
std::vector< int > getTriggerClassList() const
uint64_t getTriggerClassMask() const
std::string getClassNameFromHWIndex(int index)
o2::detectors::DetID::mask_t getDetectorMask() const
void printStream(std::ostream &stream) const
int loadConfigurationRun3(const std::string &ctpconfiguartion)
int stopRun(uint32_t irun, long timeStamp)
int setRunConfigBK(uint32_t runNumber, const std::string &cfg)
void printActiveRuns() const
int processMessage(std::string &topic, const std::string &message)
int loadRun(const std::string &cfg)
int addScalers(uint32_t irun, std::time_t time, bool start=0)
static constexpr uint32_t NCOUNTERS
Definition Scalers.h:96
static constexpr uint32_t NCOUNTERSv2
Definition Scalers.h:95
void setDetectorMask(o2::detectors::DetID::mask_t mask)
Definition Scalers.h:114
static std::vector< std::string > scalerNames
Definition Scalers.h:97
void setRunNumber(uint32_t rnumber)
Definition Scalers.h:115
void setClassMask(std::bitset< CTP_NCLASSES > classMask)
Definition Scalers.h:113
static std::string mCCDBHost
Database constants.
int saveRunScalersToQCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
static std::string mQCDBHost
int saveSoxOrbit(uint32_t runNumber, uint32_t soxOrbit, long timeStart)
static void setQCDBHost(std::string host)
int saveRunScalersToCCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
int saveRunConfigToCCDB(CTPConfiguration *cfg, long timeStart)
int saveCtpCfg(uint32_t runNumber, long timeStamp)
int saveOrbitReset(long timeStamp)
GLdouble n
Definition glcorearb.h:1982
GLsizeiptr size
Definition glcorearb.h:659
GLdouble f
Definition glcorearb.h:310
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLuint start
Definition glcorearb.h:469
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
uint32_t orbit
LHC orbit.
uint16_t bc
bunch crossing ID of interaction
counters64_t overflows
Definition RunManager.h:41
CTPConfiguration cfg
Definition RunManager.h:33
CTPRunScalers scalers
Definition RunManager.h:34
int send2BK(std::unique_ptr< BkpClient > &BKClient, size_t ts, bool start)
uint32_t classIndex
Definition Scalers.h:41
std::vector< CTPScalerRaw > scalers
Definition Scalers.h:70
o2::InteractionRecord intRecord
Definition Scalers.h:68
std::vector< uint32_t > scalersInps
Definition Scalers.h:72
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"