20#include <fairlogger/Logger.h>
30 for (
auto const& cls : clslist) {
44 for (
int i = 0;
i < 6;
i++) {
45 cnts0[cls.first][
i] = cls.second[
i];
49 std::array<uint64_t, 6> cntsbk{0};
51 for (
int i = 0;
i < 6;
i++) {
56 cntsbk[
i] = (uint64_t)cls.second[
i] + 0xffffffffull *
overflows[cls.first][
i] - (uint64_t)
cnts0[cls.first][
i];
61 BKClient->ctpTriggerCounters()->createOrUpdateForRun(runNumber, clsname, ts, cntsbk[0], cntsbk[1], cntsbk[2], cntsbk[3], cntsbk[4], cntsbk[5]);
62 }
catch (std::runtime_error& error) {
63 std::cerr <<
"An error occurred: " << error.what() << std::endl;
66 LOG(
debug) <<
"Run BK:" << runNumber <<
" class:" << clsname <<
" cls.first" << cls.first <<
" ts:" << ts <<
" cnts:" << cntsbk[0] <<
" " << cntsbk[1] <<
" " << cntsbk[2] <<
" " << cntsbk[3] <<
" " << cntsbk[4] <<
" " << cntsbk[5];
75 for (uint32_t
i = 0;
i < NRUNS;
i++) {
76 mActiveRuns[
i] =
nullptr;
79 if (mBKHost !=
"none") {
80 mBKClient = BkpClientFactory::create(mBKHost);
81 LOG(info) <<
"BK Client created with:" << mBKHost;
83 LOG(info) <<
"BK not sent";
87 LOG(info) <<
"QCDB writing every:" << mQCWritePeriod <<
" 10 secs";
89 LOG(info) <<
"CTP vNew cfg:" << mNew;
91 LOG(info) <<
"CTPRunManager initialised.";
95 LOG(info) <<
"Loading run: " << cfg;
96 auto now = std::chrono::system_clock::now();
97 long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
98 LOG(info) <<
"Timestamp real time:" << timeStamp;
99 size_t pos = cfg.find(
" ");
100 std::string cfgmod = cfg;
101 if (
pos == std::string::npos) {
102 LOG(warning) <<
"Space not found in CTP config";
104 std::string
f = cfg.substr(0,
pos);
105 if (
f.find(
"run") == std::string::npos) {
106 double_t
tt = std::stold(
f);
107 timeStamp = (
tt * 1000.);
108 LOG(info) <<
"Timestamp file:" << timeStamp;
109 cfgmod = cfg.substr(
pos, cfg.size());
110 LOG(info) <<
"ctpconfig: using ctp time";
124 mRunsLoaded[runnumber] = activerun;
133 std::cout <<
"Printing run:" << runNumber <<
" cfg:" << cfg << std::endl;
136 mBKClient->run()->setRawCtpTriggerConfiguration(runNumber, cfg);
137 }
catch (std::runtime_error& error) {
138 std::cerr <<
"An error occurred: " << error.what() << std::endl;
141 LOG(info) <<
"Run BK:" << runNumber <<
" CFG:" << cfg;
147 LOG(info) <<
"Stopping run index: " << irun;
148 if (mActiveRuns[irun] ==
nullptr) {
149 LOG(error) <<
"No config for run index:" << irun;
154 mActiveRuns[irun]->timeStop = timeStamp * 1000.;
155 saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
156 saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
157 delete mActiveRuns[irun];
158 mActiveRuns[irun] =
nullptr;
163 if (mActiveRuns[irun] ==
nullptr) {
164 LOG(error) <<
"No config for run index:" << irun;
167 std::string orb =
"extorb";
168 std::string orbitid =
"orbitid";
171 std::vector<int> clslist = mActiveRuns[irun]->cfg.getTriggerClassList();
172 for (
auto const& cls : clslist) {
182 scalraw.
lmBefore = mCounters[mScalerName2Position[cmb]];
183 scalraw.
lmAfter = mCounters[mScalerName2Position[cma]];
184 scalraw.
l0Before = mCounters[mScalerName2Position[c0b]];
185 scalraw.
l0After = mCounters[mScalerName2Position[c0a]];
186 scalraw.
l1Before = mCounters[mScalerName2Position[c1b]];
187 scalraw.
l1After = mCounters[mScalerName2Position[c1a]];
190 scalrec.
scalers.push_back(scalraw);
202 mActiveRuns[irun]->send2BK(mBKClient,
time,
start);
206 for (uint32_t
i = 0;
i < NINPS;
i++) {
207 uint32_t inpcount = mCounters[
offset +
i];
215 scalrec.
intRecord.
orbit = mCounters[mScalerName2Position[orbitid]];
218 mActiveRuns[irun]->scalers.addScalerRacordRaw(scalrec);
226 LOG(info) <<
"Processing message with topic:" << topic;
227 std::string firstcounters;
228 if (topic.find(
"clear") != std::string::npos) {
230 LOG(info) <<
"Loaded runs cleared.";
233 if (topic.find(
"ctpconfig") != std::string::npos) {
234 LOG(info) <<
"ctpconfig received";
238 if (topic.find(
"soxorbit") != std::string::npos) {
241 if (tokens.size() == 3) {
242 long timestamp = std::stol(tokens[0]);
243 uint32_t runnumber = std::stoul(tokens[1]);
244 uint32_t
orbit = std::stoul(tokens[2]);
246 std::string logmessage;
248 logmessage =
"Failed to update CCDB with SOX orbit.";
250 logmessage =
"CCDB updated with SOX orbit.";
252 LOG(important) << logmessage <<
" run:" << runnumber <<
" sox orbit:" <<
orbit <<
" ts:" << timestamp;
254 LOG(error) <<
"Topic soxorbit dize !=3: " <<
message <<
" token size:" << tokens.size();
259 if (topic.find(
"orbitreset") != std::string::npos) {
262 if (tokens.size() == 1) {
263 long timestamp = std::stol(tokens[0]);
265 std::string logmessage;
267 logmessage =
"Failed to update CCDB with orbitreset. ";
269 logmessage =
"CCDB updated with orbitreset. ";
271 LOG(important) << logmessage << timestamp;
273 LOG(error) <<
"Topic orbit reset != 2: " <<
message <<
" token size:" << tokens.size();
278 if (topic.find(
"rocnts") != std::string::npos) {
281 static int nerror = 0;
282 if (topic.find(
"sox") != std::string::npos) {
284 size_t irun =
message.find(
"run");
285 if (irun == std::string::npos) {
286 LOG(warning) <<
"run keyword not found in SOX";
289 LOG(info) <<
"SOX received, Run keyword position:" << irun;
291 firstcounters =
message.substr(0, irun);
292 }
else if (topic.find(
"eox") != std::string::npos) {
293 LOG(info) <<
"EOX received";
295 }
else if (topic.find(
"cnts") != std::string::npos) {
299 LOG(warning) <<
"Skipping topic:" << topic;
305 std::vector<std::string> tokens;
306 if (firstcounters.size() > 0) {
314 LOG(warning) <<
"v2 scaler size";
320 double timeStamp = std::stold(tokens.at(0));
321 std::time_t
tt = timeStamp;
322 LOG(info) <<
"Processing scalers, all good, time:" << tokens.at(0) <<
" " << std::asctime(std::localtime(&
tt));
323 for (uint32_t
i = 1;
i < tokens.size();
i++) {
324 mCounters[
i - 1] = std::stoull(tokens.at(
i));
325 if (
i < (NRUNS + 1)) {
326 std::cout << mCounters[
i - 1] <<
" ";
329 std::cout << std::endl;
330 LOG(info) <<
"Counter size:" << tokens.size();
332 for (uint32_t
i = 0;
i < NRUNS;
i++) {
333 if ((mCounters[
i] == 0) && (mActiveRunNumbers[
i] == 0)) {
335 }
else if ((mCounters[
i] != 0) && (mActiveRunNumbers[
i] == mCounters[
i])) {
337 LOG(info) <<
"Run continue:" << mCounters[
i];
340 if (mActiveRuns[
i]->qcwpcount > mQCWritePeriod) {
342 mActiveRuns[
i]->qcwpcount = 0;
344 mActiveRuns[
i]->qcwpcount++;
345 }
else if ((mCounters[
i] != 0) && (mActiveRunNumbers[
i] == 0)) {
346 LOG(info) <<
"Run started:" << mCounters[
i];
347 auto run = mRunsLoaded.find(mCounters[
i]);
348 if (run != mRunsLoaded.end()) {
349 mActiveRunNumbers[
i] = mCounters[
i];
350 mActiveRuns[
i] = run->second;
351 mRunsLoaded.erase(run);
352 setRunConfigBK(mActiveRuns[
i]->cfg.getRunNumber(), mActiveRuns[
i]->cfg.getConfigString());
356 LOG(error) <<
"Trying to start run which is not loaded:" << mCounters[
i];
358 }
else if ((mCounters[
i] == 0) && (mActiveRunNumbers[
i] != 0)) {
360 LOG(error) <<
"Internal error in processMessage: mEOX != 1 expected 0: mEOX:" << mEOX;
362 LOG(info) <<
"Run stopped:" << mActiveRunNumbers[
i];
364 mActiveRunNumbers[
i] = 0;
375 std::cout <<
"Active runs:";
376 for (
auto const& arun : mActiveRunNumbers) {
377 std::cout << arun <<
" ";
379 std::cout <<
" #loaded runs:" << mRunsLoaded.size();
380 for (
auto const& lrun : mRunsLoaded) {
381 std::cout <<
" " << lrun.second->cfg.getRunNumber();
383 std::cout << std::endl;
400 for (
int i = 0;
i < NRUNS;
i++) {
401 if (mActiveRuns[
i] !=
nullptr) {
412 int NLTG_start = NRUNS;
413 int NCLKFP_start = NLTG_start + NDET * 32;
414 int NINPS_start = NCLKFP_start + 7;
415 int NCLS_start = NINPS_start + NINPS;
416 std::cout <<
"====> CTP counters:" << std::endl;
417 std::cout <<
"RUNS:" << std::endl;
419 for (uint32_t
i = 0;
i < NRUNS;
i++) {
420 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
423 std::cout << std::endl;
424 for (
int i = 0;
i < NDET;
i++) {
425 std::cout <<
"LTG" <<
i + 1 << std::endl;
426 for (
int j = NLTG_start +
i * 32;
j < NLTG_start + (
i + 1) * 32;
j++) {
427 std::cout << ipos <<
":" << mCounters[
j] <<
" ";
430 std::cout << std::endl;
432 std::cout <<
"BC40,BC240,Orbit,pulser, fastlm, busy,spare" << std::endl;
433 for (
int i = NCLKFP_start;
i < NCLKFP_start + 7;
i++) {
434 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
437 std::cout << std::endl;
438 std::cout <<
"INPUTS:" << std::endl;
439 for (
int i = NINPS_start;
i < NINPS_start + NINPS;
i++) {
440 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
443 std::cout << std::endl;
444 std::cout <<
"CLASS M Before" << std::endl;
445 for (
int i = NCLS_start;
i < NCLS_start + 64;
i++) {
446 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
449 std::cout << std::endl;
450 std::cout <<
"CLASS M After" << std::endl;
451 for (
int i = NCLS_start + 64;
i < NCLS_start + 2 * 64;
i++) {
452 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
455 std::cout << std::endl;
456 std::cout <<
"CLASS 0 Before" << std::endl;
457 for (
int i = NCLS_start + 2 * 64;
i < NCLS_start + 3 * 64;
i++) {
458 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
461 std::cout << std::endl;
462 std::cout <<
"CLASS 0 After" << std::endl;
463 for (
int i = NCLS_start + 3 * 64;
i < NCLS_start + 4 * 64;
i++) {
464 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
467 std::cout << std::endl;
468 std::cout <<
"CLASS 1 Before" << std::endl;
469 for (
int i = NCLS_start + 4 * 64;
i < NCLS_start + 5 * 64;
i++) {
470 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
473 std::cout << std::endl;
474 std::cout <<
"CLASS 1 After" << std::endl;
475 for (
int i = NCLS_start + 5 * 64;
i < NCLS_start + 6 * 64;
i++) {
476 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
479 std::cout << std::endl;
480 std::cout <<
" REST:" << std::endl;
481 for (uint32_t
i = NCLS_start + 6 * 64;
i < mCounters.size();
i++) {
482 if ((ipos % 10) == 0) {
483 std::cout << std::endl;
484 std::cout << ipos <<
":";
486 std::cout << mCounters[
i] <<
" ";
Managing runs for config and scalers.
std::vector< int > getTriggerClassList() const
uint64_t getTriggerClassMask() const
std::string getClassNameFromHWIndex(int index)
o2::detectors::DetID::mask_t getDetectorMask() const
void printStream(std::ostream &stream) const
int loadConfigurationRun3(const std::string &ctpconfiguartion)
int stopRun(uint32_t irun, long timeStamp)
int setRunConfigBK(uint32_t runNumber, const std::string &cfg)
void printActiveRuns() const
int processMessage(std::string &topic, const std::string &message)
int loadRun(const std::string &cfg)
int addScalers(uint32_t irun, std::time_t time, bool start=0)
static constexpr uint32_t NCOUNTERS
static constexpr uint32_t NCOUNTERSv2
void setDetectorMask(o2::detectors::DetID::mask_t mask)
static std::vector< std::string > scalerNames
void setRunNumber(uint32_t rnumber)
void setClassMask(std::bitset< CTP_NCLASSES > classMask)
static std::string mCCDBHost
Database constants.
int saveRunScalersToQCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
static std::string mQCDBHost
int saveSoxOrbit(uint32_t runNumber, uint32_t soxOrbit, long timeStart)
static void setQCDBHost(std::string host)
int saveRunScalersToCCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
int saveRunConfigToCCDB(CTPConfiguration *cfg, long timeStart)
int saveCtpCfg(uint32_t runNumber, long timeStamp)
int saveOrbitReset(long timeStamp)
GLuint GLsizei const GLchar * message
std::string to_string(gsl::span< T, Size > span)
uint16_t bc
bunch crossing ID of interaction
int send2BK(std::unique_ptr< BkpClient > &BKClient, size_t ts, bool start)
std::vector< CTPScalerRaw > scalers
o2::InteractionRecord intRecord
std::vector< uint32_t > scalersInps
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"