20#include <fairlogger/Logger.h>
30 for (
auto const& cls : clslist) {
44 for (
int i = 0;
i < 6;
i++) {
45 cnts0[cls.first][
i] = cls.second[
i];
49 std::array<uint64_t, 6> cntsbk{0};
51 for (
int i = 0;
i < 6;
i++) {
56 cntsbk[
i] = (uint64_t)cls.second[
i] + 0xffffffffull *
overflows[cls.first][
i] - (uint64_t)
cnts0[cls.first][
i];
61 BKClient->ctpTriggerCounters()->createOrUpdateForRun(runNumber, clsname, ts, cntsbk[0], cntsbk[1], cntsbk[2], cntsbk[3], cntsbk[4], cntsbk[5]);
62 }
catch (std::runtime_error& error) {
63 std::cerr <<
"An error occurred: " << error.what() << std::endl;
66 LOG(
debug) <<
"Run BK:" << runNumber <<
" class:" << clsname <<
" cls.first" << cls.first <<
" ts:" << ts <<
" cnts:" << cntsbk[0] <<
" " << cntsbk[1] <<
" " << cntsbk[2] <<
" " << cntsbk[3] <<
" " << cntsbk[4] <<
" " << cntsbk[5];
75 for (uint32_t
i = 0;
i < NRUNS;
i++) {
76 mActiveRuns[
i] =
nullptr;
79 if (mBKHost !=
"none") {
80 mBKClient = BkpClientFactory::create(mBKHost);
81 LOG(info) <<
"BK Client created with:" << mBKHost;
83 LOG(info) <<
"BK not sent";
87 LOG(info) <<
"QCDB writing every:" << mQCWritePeriod <<
" 10 secs";
89 LOG(info) <<
"CTP vNew cfg:" << mNew;
90 LOG(info) <<
"CTPRunManager initialised.";
94 LOG(info) <<
"Loading run: " << cfg;
95 auto now = std::chrono::system_clock::now();
96 long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
97 LOG(info) <<
"Timestamp real time:" << timeStamp;
98 size_t pos = cfg.find(
" ");
99 std::string cfgmod = cfg;
100 if (
pos == std::string::npos) {
101 LOG(warning) <<
"Space not found in CTP config";
103 std::string
f = cfg.substr(0,
pos);
104 if (
f.find(
"run") == std::string::npos) {
105 double_t
tt = std::stold(
f);
106 timeStamp = (
tt * 1000.);
107 LOG(info) <<
"Timestamp file:" << timeStamp;
108 cfgmod = cfg.substr(
pos, cfg.size());
109 LOG(info) <<
"ctpcfg: using ctp time";
123 mRunsLoaded[runnumber] = activerun;
130 std::cout <<
"Printing run:" << runNumber <<
" cfg:" << cfg << std::endl;
133 mBKClient->run()->setRawCtpTriggerConfiguration(runNumber, cfg);
134 }
catch (std::runtime_error& error) {
135 std::cerr <<
"An error occurred: " << error.what() << std::endl;
138 LOG(info) <<
"Run BK:" << runNumber <<
" CFG:" << cfg;
144 LOG(info) <<
"Stopping run index: " << irun;
145 if (mActiveRuns[irun] ==
nullptr) {
146 LOG(error) <<
"No config for run index:" << irun;
151 mActiveRuns[irun]->timeStop = timeStamp * 1000.;
152 saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
153 saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
154 delete mActiveRuns[irun];
155 mActiveRuns[irun] =
nullptr;
160 if (mActiveRuns[irun] ==
nullptr) {
161 LOG(error) <<
"No config for run index:" << irun;
164 std::string orb =
"extorb";
165 std::string orbitid =
"orbitid";
168 std::vector<int> clslist = mActiveRuns[irun]->cfg.getTriggerClassList();
169 for (
auto const& cls : clslist) {
179 scalraw.
lmBefore = mCounters[mScalerName2Position[cmb]];
180 scalraw.
lmAfter = mCounters[mScalerName2Position[cma]];
181 scalraw.
l0Before = mCounters[mScalerName2Position[c0b]];
182 scalraw.
l0After = mCounters[mScalerName2Position[c0a]];
183 scalraw.
l1Before = mCounters[mScalerName2Position[c1b]];
184 scalraw.
l1After = mCounters[mScalerName2Position[c1a]];
187 scalrec.
scalers.push_back(scalraw);
199 mActiveRuns[irun]->send2BK(mBKClient,
time,
start);
203 for (uint32_t
i = 0;
i < NINPS;
i++) {
204 uint32_t inpcount = mCounters[
offset +
i];
212 scalrec.
intRecord.
orbit = mCounters[mScalerName2Position[orbitid]];
215 mActiveRuns[irun]->scalers.addScalerRacordRaw(scalrec);
223 LOG(info) <<
"Processing message with topic:" << topic;
224 std::string firstcounters;
225 if (topic.find(
"clear") != std::string::npos) {
227 LOG(info) <<
"Loaded runs cleared.";
230 if (topic.find(
"ctpconfig") != std::string::npos) {
231 LOG(info) <<
"ctpconfig received";
235 if (topic.find(
"soxorbit") != std::string::npos) {
238 if (tokens.size() == 3) {
239 long timestamp = std::stol(tokens[0]);
240 uint32_t runnumber = std::stoul(tokens[1]);
241 uint32_t
orbit = std::stoul(tokens[2]);
243 std::string logmessage;
245 logmessage =
"Failed to update CCDB with SOX orbit.";
247 logmessage =
"CCDB updated with SOX orbit.";
249 LOG(important) << logmessage <<
" run:" << runnumber <<
" sox orbit:" <<
orbit <<
" ts:" << timestamp;
251 LOG(error) <<
"Topic soxorbit dize !=3: " <<
message <<
" token size:" << tokens.size();
256 if (topic.find(
"orbitreset") != std::string::npos) {
259 if (tokens.size() == 1) {
260 long timestamp = std::stol(tokens[0]);
262 std::string logmessage;
264 logmessage =
"Failed to update CCDB with orbitreset. ";
266 logmessage =
"CCDB updated with orbitreset. ";
268 LOG(important) << logmessage << timestamp;
270 LOG(error) <<
"Topic orbit reset != 2: " <<
message <<
" token size:" << tokens.size();
275 static int nerror = 0;
276 if (topic.find(
"sox") != std::string::npos) {
278 size_t irun =
message.find(
"run");
279 if (irun == std::string::npos) {
280 LOG(warning) <<
"run keyword not found in SOX";
283 LOG(info) <<
"SOX received, Run keyword position:" << irun;
285 firstcounters =
message.substr(0, irun);
286 }
else if (topic.find(
"eox") != std::string::npos) {
287 LOG(info) <<
"EOX received";
289 }
else if (topic.find(
"cnts") != std::string::npos) {
293 LOG(warning) <<
"Skipping topic:" << topic;
299 std::vector<std::string> tokens;
300 if (firstcounters.size() > 0) {
308 LOG(warning) <<
"v2 scaler size";
314 double timeStamp = std::stold(tokens.at(0));
315 std::time_t
tt = timeStamp;
316 LOG(info) <<
"Processing scalers, all good, time:" << tokens.at(0) <<
" " << std::asctime(std::localtime(&
tt));
317 for (uint32_t
i = 1;
i < tokens.size();
i++) {
318 mCounters[
i - 1] = std::stoull(tokens.at(
i));
319 if (
i < (NRUNS + 1)) {
320 std::cout << mCounters[
i - 1] <<
" ";
323 std::cout << std::endl;
324 LOG(info) <<
"Counter size:" << tokens.size();
326 for (uint32_t
i = 0;
i < NRUNS;
i++) {
327 if ((mCounters[
i] == 0) && (mActiveRunNumbers[
i] == 0)) {
329 }
else if ((mCounters[
i] != 0) && (mActiveRunNumbers[
i] == mCounters[
i])) {
331 LOG(info) <<
"Run continue:" << mCounters[
i];
334 if (mActiveRuns[
i]->qcwpcount > mQCWritePeriod) {
336 mActiveRuns[
i]->qcwpcount = 0;
338 mActiveRuns[
i]->qcwpcount++;
339 }
else if ((mCounters[
i] != 0) && (mActiveRunNumbers[
i] == 0)) {
340 LOG(info) <<
"Run started:" << mCounters[
i];
341 auto run = mRunsLoaded.find(mCounters[
i]);
342 if (run != mRunsLoaded.end()) {
343 mActiveRunNumbers[
i] = mCounters[
i];
344 mActiveRuns[
i] = run->second;
345 mRunsLoaded.erase(run);
346 setRunConfigBK(mActiveRuns[
i]->cfg.getRunNumber(), mActiveRuns[
i]->cfg.getConfigString());
350 LOG(error) <<
"Trying to start run which is not loaded:" << mCounters[
i];
352 }
else if ((mCounters[
i] == 0) && (mActiveRunNumbers[
i] != 0)) {
354 LOG(error) <<
"Internal error in processMessage: mEOX != 1 expected 0: mEOX:" << mEOX;
356 LOG(info) <<
"Run stopped:" << mActiveRunNumbers[
i];
358 mActiveRunNumbers[
i] = 0;
369 std::cout <<
"Active runs:";
370 for (
auto const& arun : mActiveRunNumbers) {
371 std::cout << arun <<
" ";
373 std::cout <<
" #loaded runs:" << mRunsLoaded.size();
374 for (
auto const& lrun : mRunsLoaded) {
375 std::cout <<
" " << lrun.second->cfg.getRunNumber();
377 std::cout << std::endl;
394 for (
int i = 0;
i < NRUNS;
i++) {
395 if (mActiveRuns[
i] !=
nullptr) {
406 int NLTG_start = NRUNS;
407 int NCLKFP_start = NLTG_start + NDET * 32;
408 int NINPS_start = NCLKFP_start + 7;
409 int NCLS_start = NINPS_start + NINPS;
410 std::cout <<
"====> CTP counters:" << std::endl;
411 std::cout <<
"RUNS:" << std::endl;
413 for (uint32_t
i = 0;
i < NRUNS;
i++) {
414 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
417 std::cout << std::endl;
418 for (
int i = 0;
i < NDET;
i++) {
419 std::cout <<
"LTG" <<
i + 1 << std::endl;
420 for (
int j = NLTG_start +
i * 32;
j < NLTG_start + (
i + 1) * 32;
j++) {
421 std::cout << ipos <<
":" << mCounters[
j] <<
" ";
424 std::cout << std::endl;
426 std::cout <<
"BC40,BC240,Orbit,pulser, fastlm, busy,spare" << std::endl;
427 for (
int i = NCLKFP_start;
i < NCLKFP_start + 7;
i++) {
428 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
431 std::cout << std::endl;
432 std::cout <<
"INPUTS:" << std::endl;
433 for (
int i = NINPS_start;
i < NINPS_start + NINPS;
i++) {
434 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
437 std::cout << std::endl;
438 std::cout <<
"CLASS M Before" << std::endl;
439 for (
int i = NCLS_start;
i < NCLS_start + 64;
i++) {
440 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
443 std::cout << std::endl;
444 std::cout <<
"CLASS M After" << std::endl;
445 for (
int i = NCLS_start + 64;
i < NCLS_start + 2 * 64;
i++) {
446 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
449 std::cout << std::endl;
450 std::cout <<
"CLASS 0 Before" << std::endl;
451 for (
int i = NCLS_start + 2 * 64;
i < NCLS_start + 3 * 64;
i++) {
452 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
455 std::cout << std::endl;
456 std::cout <<
"CLASS 0 After" << std::endl;
457 for (
int i = NCLS_start + 3 * 64;
i < NCLS_start + 4 * 64;
i++) {
458 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
461 std::cout << std::endl;
462 std::cout <<
"CLASS 1 Before" << std::endl;
463 for (
int i = NCLS_start + 4 * 64;
i < NCLS_start + 5 * 64;
i++) {
464 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
467 std::cout << std::endl;
468 std::cout <<
"CLASS 1 After" << std::endl;
469 for (
int i = NCLS_start + 5 * 64;
i < NCLS_start + 6 * 64;
i++) {
470 std::cout << ipos <<
":" << mCounters[
i] <<
" ";
473 std::cout << std::endl;
474 std::cout <<
" REST:" << std::endl;
475 for (uint32_t
i = NCLS_start + 6 * 64;
i < mCounters.size();
i++) {
476 if ((ipos % 10) == 0) {
477 std::cout << std::endl;
478 std::cout << ipos <<
":";
480 std::cout << mCounters[
i] <<
" ";
Managing runs for config and scalers.
std::vector< int > getTriggerClassList() const
uint64_t getTriggerClassMask() const
std::string getClassNameFromHWIndex(int index)
o2::detectors::DetID::mask_t getDetectorMask() const
void printStream(std::ostream &stream) const
int loadConfigurationRun3(const std::string &ctpconfiguartion)
int stopRun(uint32_t irun, long timeStamp)
int setRunConfigBK(uint32_t runNumber, const std::string &cfg)
void printActiveRuns() const
int processMessage(std::string &topic, const std::string &message)
int loadRun(const std::string &cfg)
int addScalers(uint32_t irun, std::time_t time, bool start=0)
static constexpr uint32_t NCOUNTERS
static constexpr uint32_t NCOUNTERSv2
void setDetectorMask(o2::detectors::DetID::mask_t mask)
static std::vector< std::string > scalerNames
void setRunNumber(uint32_t rnumber)
void setClassMask(std::bitset< CTP_NCLASSES > classMask)
static std::string mCCDBHost
Database constants.
int saveRunScalersToQCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
static std::string mQCDBHost
int saveSoxOrbit(uint32_t runNumber, uint32_t soxOrbit, long timeStart)
static void setQCDBHost(std::string host)
int saveRunScalersToCCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
int saveRunConfigToCCDB(CTPConfiguration *cfg, long timeStart)
int saveOrbitReset(long timeStamp)
GLuint GLsizei const GLchar * message
std::string to_string(gsl::span< T, Size > span)
uint16_t bc
bunch crossing ID of interaction
int send2BK(std::unique_ptr< BkpClient > &BKClient, size_t ts, bool start)
std::vector< CTPScalerRaw > scalers
o2::InteractionRecord intRecord
std::vector< uint32_t > scalersInps
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"