Project
Loading...
Searching...
No Matches
RunManager.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
16#include <iostream>
17#include <sstream>
18#include <regex>
20#include <fairlogger/Logger.h>
21
22using namespace o2::ctp;
28{
29 std::vector<int> clslist = cfg.getTriggerClassList();
30 for (auto const& cls : clslist) {
31 cntslast[cls] = {0, 0, 0, 0, 0, 0};
32 cntslast0[cls] = {0, 0, 0, 0, 0, 0};
33 overflows[cls] = {0, 0, 0, 0, 0, 0};
34 }
35}
36int CTPActiveRun::send2BK(std::unique_ptr<BkpClient>& BKClient, size_t ts, bool start)
37{
38 int runNumber = cfg.getRunNumber();
39 // LOG(info) << "BK Filling run:" << runNumber;
40 // int runOri = runNumber;
41 // runNumber = 123;
42 if (start) {
43 for (auto const& cls : cntslast) {
44 for (int i = 0; i < 6; i++) {
45 cnts0[cls.first][i] = cls.second[i];
46 }
47 }
48 }
49 std::array<uint64_t, 6> cntsbk{0};
50 for (auto const& cls : cntslast) {
51 for (int i = 0; i < 6; i++) {
52 if (cls.second[i] < cntslast0[cls.first][i]) {
53 overflows[cls.first][i]++;
54 }
55 cntslast0[cls.first][i] = cls.second[i];
56 cntsbk[i] = (uint64_t)cls.second[i] + 0xffffffffull * overflows[cls.first][i] - (uint64_t)cnts0[cls.first][i];
57 }
58 std::string clsname = cfg.getClassNameFromHWIndex(cls.first);
59 // clsname = std::to_string(runOri) + "_" + clsname;
60 try {
61 BKClient->ctpTriggerCounters()->createOrUpdateForRun(runNumber, clsname, ts, cntsbk[0], cntsbk[1], cntsbk[2], cntsbk[3], cntsbk[4], cntsbk[5]);
62 } catch (std::runtime_error& error) {
63 std::cerr << "An error occurred: " << error.what() << std::endl;
64 return 1;
65 }
66 LOG(debug) << "Run BK:" << runNumber << " class:" << clsname << " cls.first" << cls.first << " ts:" << ts << " cnts:" << cntsbk[0] << " " << cntsbk[1] << " " << cntsbk[2] << " " << cntsbk[3] << " " << cntsbk[4] << " " << cntsbk[5];
67 }
68 return 0;
69}
74{
75 for (uint32_t i = 0; i < NRUNS; i++) {
76 mActiveRuns[i] = nullptr;
77 }
79 if (mBKHost != "none") {
80 mBKClient = BkpClientFactory::create(mBKHost);
81 LOG(info) << "BK Client created with:" << mBKHost;
82 } else {
83 LOG(info) << "BK not sent";
84 }
86 LOG(info) << "QC host:" << mQCDBHost;
87 LOG(info) << "QCDB writing every:" << mQCWritePeriod << " 10 secs";
88 LOG(info) << "CCDB host:" << mCCDBHost;
89 LOG(info) << "CTP vNew cfg:" << mNew;
90 LOG(info) << "CTPRunManager initialised.";
91}
92int CTPRunManager::loadRun(const std::string& cfg)
93{
94 LOG(info) << "Loading run: " << cfg;
95 auto now = std::chrono::system_clock::now();
96 long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
97 LOG(info) << "Timestamp real time:" << timeStamp;
98 size_t pos = cfg.find(" ");
99 std::string cfgmod = cfg;
100 if (pos == std::string::npos) {
101 LOG(warning) << "Space not found in CTP config";
102 } else {
103 std::string f = cfg.substr(0, pos);
104 if (f.find("run") == std::string::npos) {
105 double_t tt = std::stold(f);
106 timeStamp = (tt * 1000.);
107 LOG(info) << "Timestamp file:" << timeStamp;
108 cfgmod = cfg.substr(pos, cfg.size());
109 LOG(info) << "ctpcfg: using ctp time";
110 }
111 }
112 CTPActiveRun* activerun = new CTPActiveRun;
113 activerun->timeStart = timeStamp;
114 activerun->cfg.loadConfigurationRun3(cfgmod);
115 activerun->cfg.printStream(std::cout);
116 //
117 uint32_t runnumber = activerun->cfg.getRunNumber();
118 activerun->scalers.setRunNumber(runnumber);
119 activerun->scalers.setClassMask(activerun->cfg.getTriggerClassMask());
120 o2::detectors::DetID::mask_t detmask = activerun->cfg.getDetectorMask();
121 activerun->scalers.setDetectorMask(detmask);
122 //
123 mRunsLoaded[runnumber] = activerun;
124 saveRunConfigToCCDB(&activerun->cfg, timeStamp);
125
126 return 0;
127}
128int CTPRunManager::setRunConfigBK(uint32_t runNumber, const std::string& cfg)
129{
130 std::cout << "Printing run:" << runNumber << " cfg:" << cfg << std::endl;
131 if (mBKClient) {
132 try {
133 mBKClient->run()->setRawCtpTriggerConfiguration(runNumber, cfg);
134 } catch (std::runtime_error& error) {
135 std::cerr << "An error occurred: " << error.what() << std::endl;
136 return 1;
137 }
138 LOG(info) << "Run BK:" << runNumber << " CFG:" << cfg;
139 }
140 return 0;
141}
142int CTPRunManager::stopRun(uint32_t irun, long timeStamp)
143{
144 LOG(info) << "Stopping run index: " << irun;
145 if (mActiveRuns[irun] == nullptr) {
146 LOG(error) << "No config for run index:" << irun;
147 return 1;
148 }
149 // const auto now = std::chrono::system_clock::now();
150 // const long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
151 mActiveRuns[irun]->timeStop = timeStamp * 1000.;
152 saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
153 saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
154 delete mActiveRuns[irun];
155 mActiveRuns[irun] = nullptr;
156 return 0;
157}
158int CTPRunManager::addScalers(uint32_t irun, std::time_t time, bool start)
159{
160 if (mActiveRuns[irun] == nullptr) {
161 LOG(error) << "No config for run index:" << irun;
162 return 1;
163 }
164 std::string orb = "extorb";
165 std::string orbitid = "orbitid";
166 CTPScalerRecordRaw scalrec;
167 scalrec.epochTime = time;
168 std::vector<int> clslist = mActiveRuns[irun]->cfg.getTriggerClassList();
169 for (auto const& cls : clslist) {
170 std::string cmb = "clamb" + std::to_string(cls + 1);
171 std::string cma = "clama" + std::to_string(cls + 1);
172 std::string c0b = "cla0b" + std::to_string(cls + 1);
173 std::string c0a = "cla0a" + std::to_string(cls + 1);
174 std::string c1b = "cla1b" + std::to_string(cls + 1);
175 std::string c1a = "cla1a" + std::to_string(cls + 1);
176 CTPScalerRaw scalraw;
177 scalraw.classIndex = (uint32_t)cls;
178 // std::cout << "cls:" << cls << " " << scalraw.classIndex << std::endl;
179 scalraw.lmBefore = mCounters[mScalerName2Position[cmb]];
180 scalraw.lmAfter = mCounters[mScalerName2Position[cma]];
181 scalraw.l0Before = mCounters[mScalerName2Position[c0b]];
182 scalraw.l0After = mCounters[mScalerName2Position[c0a]];
183 scalraw.l1Before = mCounters[mScalerName2Position[c1b]];
184 scalraw.l1After = mCounters[mScalerName2Position[c1a]];
185 // std::cout << "positions:" << cmb << " " << mScalerName2Position[cmb] << std::endl;
186 // std::cout << "positions:" << cma << " " << mScalerName2Position[cma] << std::endl;
187 scalrec.scalers.push_back(scalraw);
188 // BK scalers to be corrected for overflow
189 if (mBKClient) {
190 CTPActiveRun* ar = mActiveRuns[irun];
191 ar->cntslast[cls][0] = scalraw.lmBefore;
192 ar->cntslast[cls][1] = scalraw.lmAfter;
193 ar->cntslast[cls][2] = scalraw.l0Before;
194 ar->cntslast[cls][3] = scalraw.l0After;
195 ar->cntslast[cls][4] = scalraw.l1Before;
196 ar->cntslast[cls][5] = scalraw.l1After;
197 }
198 }
199 mActiveRuns[irun]->send2BK(mBKClient, time, start);
200 //
201 uint32_t NINPS = 48;
202 int offset = 599;
203 for (uint32_t i = 0; i < NINPS; i++) {
204 uint32_t inpcount = mCounters[offset + i];
205 scalrec.scalersInps.push_back(inpcount);
206 // LOG(info) << "Scaler for input:" << CTPRunScalers::scalerNames[offset+i] << ":" << inpcount;
207 }
208 //
209 if (mNew == 0) {
210 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orb]];
211 } else {
212 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orbitid]];
213 }
214 scalrec.intRecord.bc = 0;
215 mActiveRuns[irun]->scalers.addScalerRacordRaw(scalrec);
216 LOG(info) << "Adding scalers for orbit:" << scalrec.intRecord.orbit;
217 // scalrec.printStream(std::cout);
218 // printCounters();
219 return 0;
220}
221int CTPRunManager::processMessage(std::string& topic, const std::string& message)
222{
223 LOG(info) << "Processing message with topic:" << topic;
224 std::string firstcounters;
225 if (topic.find("clear") != std::string::npos) {
226 mRunsLoaded.clear();
227 LOG(info) << "Loaded runs cleared.";
228 return 0;
229 }
230 if (topic.find("ctpconfig") != std::string::npos) {
231 LOG(info) << "ctpconfig received";
233 return 0;
234 }
235 if (topic.find("soxorbit") != std::string::npos) {
236 std::vector<std::string> tokens = o2::utils::Str::tokenize(message, ' ');
237 int ret = 0;
238 if (tokens.size() == 3) {
239 long timestamp = std::stol(tokens[0]);
240 uint32_t runnumber = std::stoul(tokens[1]);
241 uint32_t orbit = std::stoul(tokens[2]);
242 ret = saveSoxOrbit(runnumber, orbit, timestamp);
243 std::string logmessage;
244 if (ret) {
245 logmessage = "Failed to update CCDB with SOX orbit.";
246 } else {
247 logmessage = "CCDB updated with SOX orbit.";
248 }
249 LOG(important) << logmessage << " run:" << runnumber << " sox orbit:" << orbit << " ts:" << timestamp;
250 } else {
251 LOG(error) << "Topic soxorbit dize !=3: " << message << " token size:" << tokens.size();
252 ret = 1;
253 }
254 return ret;
255 }
256 if (topic.find("orbitreset") != std::string::npos) {
257 std::vector<std::string> tokens = o2::utils::Str::tokenize(message, ' ');
258 int ret = 0;
259 if (tokens.size() == 1) {
260 long timestamp = std::stol(tokens[0]);
261 ret = saveOrbitReset(timestamp);
262 std::string logmessage;
263 if (ret) {
264 logmessage = "Failed to update CCDB with orbitreset. ";
265 } else {
266 logmessage = "CCDB updated with orbitreset. ";
267 }
268 LOG(important) << logmessage << timestamp;
269 } else {
270 LOG(error) << "Topic orbit reset != 2: " << message << " token size:" << tokens.size();
271 ret = 1;
272 }
273 return ret;
274 }
275 static int nerror = 0;
276 if (topic.find("sox") != std::string::npos) {
277 // get config
278 size_t irun = message.find("run");
279 if (irun == std::string::npos) {
280 LOG(warning) << "run keyword not found in SOX";
281 irun = message.size();
282 }
283 LOG(info) << "SOX received, Run keyword position:" << irun;
284 std::string cfg = message.substr(irun, message.size() - irun);
285 firstcounters = message.substr(0, irun);
286 } else if (topic.find("eox") != std::string::npos) {
287 LOG(info) << "EOX received";
288 mEOX = 1;
289 } else if (topic.find("cnts") != std::string::npos) {
290 // just continue
291 } else {
292 if (nerror < 10) {
293 LOG(warning) << "Skipping topic:" << topic;
294 nerror++;
295 }
296 return 0;
297 }
298 //
299 std::vector<std::string> tokens;
300 if (firstcounters.size() > 0) {
301 tokens = o2::utils::Str::tokenize(firstcounters, ' ');
302 } else {
303 tokens = o2::utils::Str::tokenize(message, ' ');
304 }
305 if (tokens.size() != (CTPRunScalers::NCOUNTERS + 1)) {
306 if (tokens.size() == (CTPRunScalers::NCOUNTERSv2 + 1)) {
307 mNew = 0;
308 LOG(warning) << "v2 scaler size";
309 } else {
310 LOG(warning) << "Scalers size wrong:" << tokens.size() << " expected:" << CTPRunScalers::NCOUNTERS + 1 << " or " << CTPRunScalers::NCOUNTERSv2 + 1;
311 return 1;
312 }
313 }
314 double timeStamp = std::stold(tokens.at(0));
315 std::time_t tt = timeStamp;
316 LOG(info) << "Processing scalers, all good, time:" << tokens.at(0) << " " << std::asctime(std::localtime(&tt));
317 for (uint32_t i = 1; i < tokens.size(); i++) {
318 mCounters[i - 1] = std::stoull(tokens.at(i));
319 if (i < (NRUNS + 1)) {
320 std::cout << mCounters[i - 1] << " ";
321 }
322 }
323 std::cout << std::endl;
324 LOG(info) << "Counter size:" << tokens.size();
325 //
326 for (uint32_t i = 0; i < NRUNS; i++) {
327 if ((mCounters[i] == 0) && (mActiveRunNumbers[i] == 0)) {
328 // not active
329 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == mCounters[i])) {
330 // active , do scalers
331 LOG(info) << "Run continue:" << mCounters[i];
332 addScalers(i, tt);
333 // LOG(info) << " QC period:" << mActiveRunNumbers[i] << " " << mActiveRuns[i]->qcwpcount << " " << mQCWritePeriod;
334 if (mActiveRuns[i]->qcwpcount > mQCWritePeriod) {
335 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
336 mActiveRuns[i]->qcwpcount = 0;
337 }
338 mActiveRuns[i]->qcwpcount++;
339 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == 0)) {
340 LOG(info) << "Run started:" << mCounters[i];
341 auto run = mRunsLoaded.find(mCounters[i]);
342 if (run != mRunsLoaded.end()) {
343 mActiveRunNumbers[i] = mCounters[i];
344 mActiveRuns[i] = run->second;
345 mRunsLoaded.erase(run);
346 setRunConfigBK(mActiveRuns[i]->cfg.getRunNumber(), mActiveRuns[i]->cfg.getConfigString());
347 addScalers(i, tt, 1);
348 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
349 } else {
350 LOG(error) << "Trying to start run which is not loaded:" << mCounters[i];
351 }
352 } else if ((mCounters[i] == 0) && (mActiveRunNumbers[i] != 0)) {
353 if (mEOX != 1) {
354 LOG(error) << "Internal error in processMessage: mEOX != 1 expected 0: mEOX:" << mEOX;
355 }
356 LOG(info) << "Run stopped:" << mActiveRunNumbers[i];
357 addScalers(i, tt);
358 mActiveRunNumbers[i] = 0;
359 mEOX = 0;
360 stopRun(i, tt);
361 }
362 }
363 mEOX = 0;
365 return 0;
366}
368{
369 std::cout << "Active runs:";
370 for (auto const& arun : mActiveRunNumbers) {
371 std::cout << arun << " ";
372 }
373 std::cout << " #loaded runs:" << mRunsLoaded.size();
374 for (auto const& lrun : mRunsLoaded) {
375 std::cout << " " << lrun.second->cfg.getRunNumber();
376 }
377 std::cout << std::endl;
378}
380{
382 LOG(fatal) << "NCOUNTERS:" << CTPRunScalers::NCOUNTERS << " different from names vector:" << CTPRunScalers::scalerNames.size();
383 return 1;
384 }
385 // try to open files of no success use default
386 for (uint32_t i = 0; i < CTPRunScalers::scalerNames.size(); i++) {
387 mScalerName2Position[CTPRunScalers::scalerNames[i]] = i;
388 }
389 return 0;
390}
392{
393 int n = 0;
394 for (int i = 0; i < NRUNS; i++) {
395 if (mActiveRuns[i] != nullptr) {
396 n++;
397 }
398 }
399 return n;
400}
402{
403 int NDET = 18;
404 int NINPS = 48;
405 // int NCLKFP = 7;
406 int NLTG_start = NRUNS;
407 int NCLKFP_start = NLTG_start + NDET * 32;
408 int NINPS_start = NCLKFP_start + 7;
409 int NCLS_start = NINPS_start + NINPS;
410 std::cout << "====> CTP counters:" << std::endl;
411 std::cout << "RUNS:" << std::endl;
412 int ipos = 0;
413 for (uint32_t i = 0; i < NRUNS; i++) {
414 std::cout << ipos << ":" << mCounters[i] << " ";
415 ipos++;
416 }
417 std::cout << std::endl;
418 for (int i = 0; i < NDET; i++) {
419 std::cout << "LTG" << i + 1 << std::endl;
420 for (int j = NLTG_start + i * 32; j < NLTG_start + (i + 1) * 32; j++) {
421 std::cout << ipos << ":" << mCounters[j] << " ";
422 ipos++;
423 }
424 std::cout << std::endl;
425 }
426 std::cout << "BC40,BC240,Orbit,pulser, fastlm, busy,spare" << std::endl;
427 for (int i = NCLKFP_start; i < NCLKFP_start + 7; i++) {
428 std::cout << ipos << ":" << mCounters[i] << " ";
429 ipos++;
430 }
431 std::cout << std::endl;
432 std::cout << "INPUTS:" << std::endl;
433 for (int i = NINPS_start; i < NINPS_start + NINPS; i++) {
434 std::cout << ipos << ":" << mCounters[i] << " ";
435 ipos++;
436 }
437 std::cout << std::endl;
438 std::cout << "CLASS M Before" << std::endl;
439 for (int i = NCLS_start; i < NCLS_start + 64; i++) {
440 std::cout << ipos << ":" << mCounters[i] << " ";
441 ipos++;
442 }
443 std::cout << std::endl;
444 std::cout << "CLASS M After" << std::endl;
445 for (int i = NCLS_start + 64; i < NCLS_start + 2 * 64; i++) {
446 std::cout << ipos << ":" << mCounters[i] << " ";
447 ipos++;
448 }
449 std::cout << std::endl;
450 std::cout << "CLASS 0 Before" << std::endl;
451 for (int i = NCLS_start + 2 * 64; i < NCLS_start + 3 * 64; i++) {
452 std::cout << ipos << ":" << mCounters[i] << " ";
453 ipos++;
454 }
455 std::cout << std::endl;
456 std::cout << "CLASS 0 After" << std::endl;
457 for (int i = NCLS_start + 3 * 64; i < NCLS_start + 4 * 64; i++) {
458 std::cout << ipos << ":" << mCounters[i] << " ";
459 ipos++;
460 }
461 std::cout << std::endl;
462 std::cout << "CLASS 1 Before" << std::endl;
463 for (int i = NCLS_start + 4 * 64; i < NCLS_start + 5 * 64; i++) {
464 std::cout << ipos << ":" << mCounters[i] << " ";
465 ipos++;
466 }
467 std::cout << std::endl;
468 std::cout << "CLASS 1 After" << std::endl;
469 for (int i = NCLS_start + 5 * 64; i < NCLS_start + 6 * 64; i++) {
470 std::cout << ipos << ":" << mCounters[i] << " ";
471 ipos++;
472 }
473 std::cout << std::endl;
474 std::cout << " REST:" << std::endl;
475 for (uint32_t i = NCLS_start + 6 * 64; i < mCounters.size(); i++) {
476 if ((ipos % 10) == 0) {
477 std::cout << std::endl;
478 std::cout << ipos << ":";
479 }
480 std::cout << mCounters[i] << " ";
481 }
482}
uint64_t orbit
Definition RawEventData.h:6
int16_t time
Definition RawEventData.h:4
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t j
Definition RawData.h:0
Managing runs for config and scalers.
std::ostringstream debug
std::vector< int > getTriggerClassList() const
uint64_t getTriggerClassMask() const
std::string getClassNameFromHWIndex(int index)
o2::detectors::DetID::mask_t getDetectorMask() const
void printStream(std::ostream &stream) const
int loadConfigurationRun3(const std::string &ctpconfiguartion)
int stopRun(uint32_t irun, long timeStamp)
int setRunConfigBK(uint32_t runNumber, const std::string &cfg)
void printActiveRuns() const
int processMessage(std::string &topic, const std::string &message)
int loadRun(const std::string &cfg)
int addScalers(uint32_t irun, std::time_t time, bool start=0)
static constexpr uint32_t NCOUNTERS
Definition Scalers.h:96
static constexpr uint32_t NCOUNTERSv2
Definition Scalers.h:95
void setDetectorMask(o2::detectors::DetID::mask_t mask)
Definition Scalers.h:114
static std::vector< std::string > scalerNames
Definition Scalers.h:97
void setRunNumber(uint32_t rnumber)
Definition Scalers.h:115
void setClassMask(std::bitset< CTP_NCLASSES > classMask)
Definition Scalers.h:113
static std::string mCCDBHost
Database constants.
int saveRunScalersToQCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
static std::string mQCDBHost
int saveSoxOrbit(uint32_t runNumber, uint32_t soxOrbit, long timeStart)
static void setQCDBHost(std::string host)
int saveRunScalersToCCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
int saveRunConfigToCCDB(CTPConfiguration *cfg, long timeStart)
int saveOrbitReset(long timeStamp)
GLdouble n
Definition glcorearb.h:1982
GLsizeiptr size
Definition glcorearb.h:659
GLdouble f
Definition glcorearb.h:310
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLuint start
Definition glcorearb.h:469
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
uint32_t orbit
LHC orbit.
uint16_t bc
bunch crossing ID of interaction
counters64_t overflows
Definition RunManager.h:41
CTPConfiguration cfg
Definition RunManager.h:33
CTPRunScalers scalers
Definition RunManager.h:34
int send2BK(std::unique_ptr< BkpClient > &BKClient, size_t ts, bool start)
uint32_t classIndex
Definition Scalers.h:41
std::vector< CTPScalerRaw > scalers
Definition Scalers.h:70
o2::InteractionRecord intRecord
Definition Scalers.h:68
std::vector< uint32_t > scalersInps
Definition Scalers.h:72
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"