Project
Loading...
Searching...
No Matches
RunManager.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
16#include <iostream>
17#include <sstream>
18#include <regex>
20#include <fairlogger/Logger.h>
21using namespace o2::ctp;
27{
28 std::vector<int> clslist = cfg.getTriggerClassList();
29 for (auto const& cls : clslist) {
30 cntslast[cls] = {0, 0, 0, 0, 0, 0};
31 cntslast0[cls] = {0, 0, 0, 0, 0, 0};
32 overflows[cls] = {0, 0, 0, 0, 0, 0};
33 }
34}
35int CTPActiveRun::send2BK(std::unique_ptr<BkpClient>& BKClient, size_t ts, bool start)
36{
37 int runNumber = cfg.getRunNumber();
38 // LOG(info) << "BK Filling run:" << runNumber;
39 // int runOri = runNumber;
40 // runNumber = 123;
41 if (start) {
42 for (auto const& cls : cntslast) {
43 for (int i = 0; i < 6; i++) {
44 cnts0[cls.first][i] = cls.second[i];
45 }
46 }
47 }
48 std::array<uint64_t, 6> cntsbk{0};
49 for (auto const& cls : cntslast) {
50 for (int i = 0; i < 6; i++) {
51 if (cls.second[i] < cntslast0[cls.first][i]) {
52 overflows[cls.first][i]++;
53 }
54 cntslast0[cls.first][i] = cls.second[i];
55 cntsbk[i] = (uint64_t)cls.second[i] + 0xffffffffull * overflows[cls.first][i] - (uint64_t)cnts0[cls.first][i];
56 }
57 std::string clsname = cfg.getClassNameFromHWIndex(cls.first);
58 // clsname = std::to_string(runOri) + "_" + clsname;
59 try {
60 BKClient->ctpTriggerCounters()->createOrUpdateForRun(runNumber, clsname, ts, cntsbk[0], cntsbk[1], cntsbk[2], cntsbk[3], cntsbk[4], cntsbk[5]);
61 } catch (std::runtime_error& error) {
62 std::cerr << "An error occurred: " << error.what() << std::endl;
63 return 1;
64 }
65 LOG(debug) << "Run BK:" << runNumber << " class:" << clsname << " cls.first" << cls.first << " ts:" << ts << " cnts:" << cntsbk[0] << " " << cntsbk[1] << " " << cntsbk[2] << " " << cntsbk[3] << " " << cntsbk[4] << " " << cntsbk[5];
66 }
67 return 0;
68}
73{
74 for (uint32_t i = 0; i < NRUNS; i++) {
75 mActiveRuns[i] = nullptr;
76 }
78 if (mBKHost != "none") {
79 mBKClient = BkpClientFactory::create(mBKHost);
80 LOG(info) << "BK Client created with:" << mBKHost;
81 } else {
82 LOG(info) << "BK not sent";
83 }
85 LOG(info) << "QC host:" << mQCDBHost;
86 LOG(info) << "QCDB writing every:" << mQCWritePeriod << " 10 secs";
87 LOG(info) << "CCDB host:" << mCCDBHost;
88 LOG(info) << "CTP vNew cfg:" << mNew;
89 LOG(info) << "CTPRunManager initialised.";
90}
91int CTPRunManager::loadRun(const std::string& cfg)
92{
93 LOG(info) << "Loading run: " << cfg;
94 auto now = std::chrono::system_clock::now();
95 long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
96 LOG(info) << "Timestamp real time:" << timeStamp;
97 size_t pos = cfg.find(" ");
98 std::string cfgmod = cfg;
99 if (pos == std::string::npos) {
100 LOG(warning) << "Space not found in CTP config";
101 } else {
102 std::string f = cfg.substr(0, pos);
103 if (f.find("run") == std::string::npos) {
104 double_t tt = std::stold(f);
105 timeStamp = (tt * 1000.);
106 LOG(info) << "Timestamp file:" << timeStamp;
107 cfgmod = cfg.substr(pos, cfg.size());
108 LOG(info) << "ctpcfg: using ctp time";
109 }
110 }
111 CTPActiveRun* activerun = new CTPActiveRun;
112 activerun->timeStart = timeStamp;
113 activerun->cfg.loadConfigurationRun3(cfgmod);
114 activerun->cfg.printStream(std::cout);
115 //
116 uint32_t runnumber = activerun->cfg.getRunNumber();
117 activerun->scalers.setRunNumber(runnumber);
118 activerun->scalers.setClassMask(activerun->cfg.getTriggerClassMask());
119 o2::detectors::DetID::mask_t detmask = activerun->cfg.getDetectorMask();
120 activerun->scalers.setDetectorMask(detmask);
121 //
122 mRunsLoaded[runnumber] = activerun;
123 saveRunConfigToCCDB(&activerun->cfg, timeStamp);
124
125 return 0;
126}
127int CTPRunManager::setRunConfigBK(uint32_t runNumber, const std::string& cfg)
128{
129 std::cout << "Printing run:" << runNumber << " cfg:" << cfg << std::endl;
130 if (mBKClient) {
131 try {
132 mBKClient->run()->setRawCtpTriggerConfiguration(runNumber, cfg);
133 } catch (std::runtime_error& error) {
134 std::cerr << "An error occurred: " << error.what() << std::endl;
135 return 1;
136 }
137 LOG(info) << "Run BK:" << runNumber << " CFG:" << cfg;
138 }
139 return 0;
140}
141int CTPRunManager::stopRun(uint32_t irun, long timeStamp)
142{
143 LOG(info) << "Stopping run index: " << irun;
144 if (mActiveRuns[irun] == nullptr) {
145 LOG(error) << "No config for run index:" << irun;
146 return 1;
147 }
148 // const auto now = std::chrono::system_clock::now();
149 // const long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
150 mActiveRuns[irun]->timeStop = timeStamp * 1000.;
151 saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
152 saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
153 delete mActiveRuns[irun];
154 mActiveRuns[irun] = nullptr;
155 return 0;
156}
157int CTPRunManager::addScalers(uint32_t irun, std::time_t time, bool start)
158{
159 if (mActiveRuns[irun] == nullptr) {
160 LOG(error) << "No config for run index:" << irun;
161 return 1;
162 }
163 std::string orb = "extorb";
164 std::string orbitid = "orbitid";
165 CTPScalerRecordRaw scalrec;
166 scalrec.epochTime = time;
167 std::vector<int> clslist = mActiveRuns[irun]->cfg.getTriggerClassList();
168 for (auto const& cls : clslist) {
169 std::string cmb = "clamb" + std::to_string(cls + 1);
170 std::string cma = "clama" + std::to_string(cls + 1);
171 std::string c0b = "cla0b" + std::to_string(cls + 1);
172 std::string c0a = "cla0a" + std::to_string(cls + 1);
173 std::string c1b = "cla1b" + std::to_string(cls + 1);
174 std::string c1a = "cla1a" + std::to_string(cls + 1);
175 CTPScalerRaw scalraw;
176 scalraw.classIndex = (uint32_t)cls;
177 // std::cout << "cls:" << cls << " " << scalraw.classIndex << std::endl;
178 scalraw.lmBefore = mCounters[mScalerName2Position[cmb]];
179 scalraw.lmAfter = mCounters[mScalerName2Position[cma]];
180 scalraw.l0Before = mCounters[mScalerName2Position[c0b]];
181 scalraw.l0After = mCounters[mScalerName2Position[c0a]];
182 scalraw.l1Before = mCounters[mScalerName2Position[c1b]];
183 scalraw.l1After = mCounters[mScalerName2Position[c1a]];
184 // std::cout << "positions:" << cmb << " " << mScalerName2Position[cmb] << std::endl;
185 // std::cout << "positions:" << cma << " " << mScalerName2Position[cma] << std::endl;
186 scalrec.scalers.push_back(scalraw);
187 // BK scalers to be corrected for overflow
188 if (mBKClient) {
189 CTPActiveRun* ar = mActiveRuns[irun];
190 ar->cntslast[cls][0] = scalraw.lmBefore;
191 ar->cntslast[cls][1] = scalraw.lmAfter;
192 ar->cntslast[cls][2] = scalraw.l0Before;
193 ar->cntslast[cls][3] = scalraw.l0After;
194 ar->cntslast[cls][4] = scalraw.l1Before;
195 ar->cntslast[cls][5] = scalraw.l1After;
196 }
197 }
198 mActiveRuns[irun]->send2BK(mBKClient, time, start);
199 //
200 uint32_t NINPS = 48;
201 int offset = 599;
202 for (uint32_t i = 0; i < NINPS; i++) {
203 uint32_t inpcount = mCounters[offset + i];
204 scalrec.scalersInps.push_back(inpcount);
205 // LOG(info) << "Scaler for input:" << CTPRunScalers::scalerNames[offset+i] << ":" << inpcount;
206 }
207 //
208 if (mNew == 0) {
209 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orb]];
210 } else {
211 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orbitid]];
212 }
213 scalrec.intRecord.bc = 0;
214 mActiveRuns[irun]->scalers.addScalerRacordRaw(scalrec);
215 LOG(info) << "Adding scalers for orbit:" << scalrec.intRecord.orbit;
216 // scalrec.printStream(std::cout);
217 // printCounters();
218 return 0;
219}
220int CTPRunManager::processMessage(std::string& topic, const std::string& message)
221{
222 LOG(info) << "Processing message with topic:" << topic;
223 std::string firstcounters;
224 if (topic.find("clear") != std::string::npos) {
225 mRunsLoaded.clear();
226 LOG(info) << "Loaded runs cleared.";
227 return 0;
228 }
229 if (topic.find("ctpconfig") != std::string::npos) {
230 LOG(info) << "ctpconfig received";
232 return 0;
233 }
234 if (topic.find("soxorbit") != std::string::npos) {
235 return 0;
236 }
237 if (topic.find("orbitreset") != std::string::npos) {
238 return 0;
239 }
240 static int nerror = 0;
241 if (topic.find("sox") != std::string::npos) {
242 // get config
243 size_t irun = message.find("run");
244 if (irun == std::string::npos) {
245 LOG(warning) << "run keyword not found in SOX";
246 irun = message.size();
247 }
248 LOG(info) << "SOX received, Run keyword position:" << irun;
249 std::string cfg = message.substr(irun, message.size() - irun);
250 firstcounters = message.substr(0, irun);
251 } else if (topic.find("eox") != std::string::npos) {
252 LOG(info) << "EOX received";
253 mEOX = 1;
254 } else if (topic.find("cnts") != std::string::npos) {
255 // just continue
256 } else {
257 if (nerror < 10) {
258 LOG(warning) << "Skipping topic:" << topic;
259 nerror++;
260 }
261 return 0;
262 }
263 //
264 std::vector<std::string> tokens;
265 if (firstcounters.size() > 0) {
266 tokens = o2::utils::Str::tokenize(firstcounters, ' ');
267 } else {
268 tokens = o2::utils::Str::tokenize(message, ' ');
269 }
270 if (tokens.size() != (CTPRunScalers::NCOUNTERS + 1)) {
271 if (tokens.size() == (CTPRunScalers::NCOUNTERSv2 + 1)) {
272 mNew = 0;
273 LOG(warning) << "v2 scaler size";
274 } else {
275 LOG(warning) << "Scalers size wrong:" << tokens.size() << " expected:" << CTPRunScalers::NCOUNTERS + 1 << " or " << CTPRunScalers::NCOUNTERSv2 + 1;
276 return 1;
277 }
278 }
279 double timeStamp = std::stold(tokens.at(0));
280 std::time_t tt = timeStamp;
281 LOG(info) << "Processing scalers, all good, time:" << tokens.at(0) << " " << std::asctime(std::localtime(&tt));
282 for (uint32_t i = 1; i < tokens.size(); i++) {
283 mCounters[i - 1] = std::stoull(tokens.at(i));
284 if (i < (NRUNS + 1)) {
285 std::cout << mCounters[i - 1] << " ";
286 }
287 }
288 std::cout << std::endl;
289 LOG(info) << "Counter size:" << tokens.size();
290 //
291 for (uint32_t i = 0; i < NRUNS; i++) {
292 if ((mCounters[i] == 0) && (mActiveRunNumbers[i] == 0)) {
293 // not active
294 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == mCounters[i])) {
295 // active , do scalers
296 LOG(info) << "Run continue:" << mCounters[i];
297 addScalers(i, tt);
298 // LOG(info) << " QC period:" << mActiveRunNumbers[i] << " " << mActiveRuns[i]->qcwpcount << " " << mQCWritePeriod;
299 if (mActiveRuns[i]->qcwpcount > mQCWritePeriod) {
300 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
301 mActiveRuns[i]->qcwpcount = 0;
302 }
303 mActiveRuns[i]->qcwpcount++;
304 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == 0)) {
305 LOG(info) << "Run started:" << mCounters[i];
306 auto run = mRunsLoaded.find(mCounters[i]);
307 if (run != mRunsLoaded.end()) {
308 mActiveRunNumbers[i] = mCounters[i];
309 mActiveRuns[i] = run->second;
310 mRunsLoaded.erase(run);
311 setRunConfigBK(mActiveRuns[i]->cfg.getRunNumber(), mActiveRuns[i]->cfg.getConfigString());
312 addScalers(i, tt, 1);
313 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
314 } else {
315 LOG(error) << "Trying to start run which is not loaded:" << mCounters[i];
316 }
317 } else if ((mCounters[i] == 0) && (mActiveRunNumbers[i] != 0)) {
318 if (mEOX != 1) {
319 LOG(error) << "Internal error in processMessage: mEOX != 1 expected 0: mEOX:" << mEOX;
320 }
321 LOG(info) << "Run stopped:" << mActiveRunNumbers[i];
322 addScalers(i, tt);
323 mActiveRunNumbers[i] = 0;
324 mEOX = 0;
325 stopRun(i, tt);
326 }
327 }
328 mEOX = 0;
330 return 0;
331}
333{
334 std::cout << "Active runs:";
335 for (auto const& arun : mActiveRunNumbers) {
336 std::cout << arun << " ";
337 }
338 std::cout << " #loaded runs:" << mRunsLoaded.size();
339 for (auto const& lrun : mRunsLoaded) {
340 std::cout << " " << lrun.second->cfg.getRunNumber();
341 }
342 std::cout << std::endl;
343}
345{
347 LOG(fatal) << "NCOUNTERS:" << CTPRunScalers::NCOUNTERS << " different from names vector:" << CTPRunScalers::scalerNames.size();
348 return 1;
349 }
350 // try to open files of no success use default
351 for (uint32_t i = 0; i < CTPRunScalers::scalerNames.size(); i++) {
352 mScalerName2Position[CTPRunScalers::scalerNames[i]] = i;
353 }
354 return 0;
355}
357{
358 int n = 0;
359 for (int i = 0; i < NRUNS; i++) {
360 if (mActiveRuns[i] != nullptr) {
361 n++;
362 }
363 }
364 return n;
365}
367{
368 int NDET = 18;
369 int NINPS = 48;
370 // int NCLKFP = 7;
371 int NLTG_start = NRUNS;
372 int NCLKFP_start = NLTG_start + NDET * 32;
373 int NINPS_start = NCLKFP_start + 7;
374 int NCLS_start = NINPS_start + NINPS;
375 std::cout << "====> CTP counters:" << std::endl;
376 std::cout << "RUNS:" << std::endl;
377 int ipos = 0;
378 for (uint32_t i = 0; i < NRUNS; i++) {
379 std::cout << ipos << ":" << mCounters[i] << " ";
380 ipos++;
381 }
382 std::cout << std::endl;
383 for (int i = 0; i < NDET; i++) {
384 std::cout << "LTG" << i + 1 << std::endl;
385 for (int j = NLTG_start + i * 32; j < NLTG_start + (i + 1) * 32; j++) {
386 std::cout << ipos << ":" << mCounters[j] << " ";
387 ipos++;
388 }
389 std::cout << std::endl;
390 }
391 std::cout << "BC40,BC240,Orbit,pulser, fastlm, busy,spare" << std::endl;
392 for (int i = NCLKFP_start; i < NCLKFP_start + 7; i++) {
393 std::cout << ipos << ":" << mCounters[i] << " ";
394 ipos++;
395 }
396 std::cout << std::endl;
397 std::cout << "INPUTS:" << std::endl;
398 for (int i = NINPS_start; i < NINPS_start + NINPS; i++) {
399 std::cout << ipos << ":" << mCounters[i] << " ";
400 ipos++;
401 }
402 std::cout << std::endl;
403 std::cout << "CLASS M Before" << std::endl;
404 for (int i = NCLS_start; i < NCLS_start + 64; i++) {
405 std::cout << ipos << ":" << mCounters[i] << " ";
406 ipos++;
407 }
408 std::cout << std::endl;
409 std::cout << "CLASS M After" << std::endl;
410 for (int i = NCLS_start + 64; i < NCLS_start + 2 * 64; i++) {
411 std::cout << ipos << ":" << mCounters[i] << " ";
412 ipos++;
413 }
414 std::cout << std::endl;
415 std::cout << "CLASS 0 Before" << std::endl;
416 for (int i = NCLS_start + 2 * 64; i < NCLS_start + 3 * 64; i++) {
417 std::cout << ipos << ":" << mCounters[i] << " ";
418 ipos++;
419 }
420 std::cout << std::endl;
421 std::cout << "CLASS 0 After" << std::endl;
422 for (int i = NCLS_start + 3 * 64; i < NCLS_start + 4 * 64; i++) {
423 std::cout << ipos << ":" << mCounters[i] << " ";
424 ipos++;
425 }
426 std::cout << std::endl;
427 std::cout << "CLASS 1 Before" << std::endl;
428 for (int i = NCLS_start + 4 * 64; i < NCLS_start + 5 * 64; i++) {
429 std::cout << ipos << ":" << mCounters[i] << " ";
430 ipos++;
431 }
432 std::cout << std::endl;
433 std::cout << "CLASS 1 After" << std::endl;
434 for (int i = NCLS_start + 5 * 64; i < NCLS_start + 6 * 64; i++) {
435 std::cout << ipos << ":" << mCounters[i] << " ";
436 ipos++;
437 }
438 std::cout << std::endl;
439 std::cout << " REST:" << std::endl;
440 for (uint32_t i = NCLS_start + 6 * 64; i < mCounters.size(); i++) {
441 if ((ipos % 10) == 0) {
442 std::cout << std::endl;
443 std::cout << ipos << ":";
444 }
445 std::cout << mCounters[i] << " ";
446 }
447}
int16_t time
Definition RawEventData.h:4
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t j
Definition RawData.h:0
Managing runs for config and scalers.
std::ostringstream debug
std::vector< int > getTriggerClassList() const
uint64_t getTriggerClassMask() const
std::string getClassNameFromHWIndex(int index)
o2::detectors::DetID::mask_t getDetectorMask() const
void printStream(std::ostream &stream) const
int loadConfigurationRun3(const std::string &ctpconfiguartion)
int stopRun(uint32_t irun, long timeStamp)
int setRunConfigBK(uint32_t runNumber, const std::string &cfg)
void printActiveRuns() const
int processMessage(std::string &topic, const std::string &message)
int loadRun(const std::string &cfg)
int addScalers(uint32_t irun, std::time_t time, bool start=0)
static constexpr uint32_t NCOUNTERS
Definition Scalers.h:96
static constexpr uint32_t NCOUNTERSv2
Definition Scalers.h:95
void setDetectorMask(o2::detectors::DetID::mask_t mask)
Definition Scalers.h:114
static std::vector< std::string > scalerNames
Definition Scalers.h:97
void setRunNumber(uint32_t rnumber)
Definition Scalers.h:115
void setClassMask(std::bitset< CTP_NCLASSES > classMask)
Definition Scalers.h:113
static std::string mCCDBHost
Database constants.
int saveRunScalersToQCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
static std::string mQCDBHost
static void setQCDBHost(std::string host)
int saveRunScalersToCCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
int saveRunConfigToCCDB(CTPConfiguration *cfg, long timeStart)
GLdouble n
Definition glcorearb.h:1982
GLsizeiptr size
Definition glcorearb.h:659
GLdouble f
Definition glcorearb.h:310
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLuint start
Definition glcorearb.h:469
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
uint32_t orbit
LHC orbit.
uint16_t bc
bunch crossing ID of interaction
counters64_t overflows
Definition RunManager.h:40
CTPConfiguration cfg
Definition RunManager.h:32
CTPRunScalers scalers
Definition RunManager.h:33
int send2BK(std::unique_ptr< BkpClient > &BKClient, size_t ts, bool start)
uint32_t classIndex
Definition Scalers.h:41
std::vector< CTPScalerRaw > scalers
Definition Scalers.h:70
o2::InteractionRecord intRecord
Definition Scalers.h:68
std::vector< uint32_t > scalersInps
Definition Scalers.h:72
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"