Project
Loading...
Searching...
No Matches
RunManager.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
16#include <iostream>
17#include <sstream>
18#include <regex>
20#include <fairlogger/Logger.h>
21using namespace o2::ctp;
27{
28 std::vector<int> clslist = cfg.getTriggerClassList();
29 for (auto const& cls : clslist) {
30 cntslast[cls] = {0, 0, 0, 0, 0, 0};
31 cntslast0[cls] = {0, 0, 0, 0, 0, 0};
32 overflows[cls] = {0, 0, 0, 0, 0, 0};
33 }
34}
35int CTPActiveRun::send2BK(std::unique_ptr<BkpClient>& BKClient, size_t ts, bool start)
36{
37 int runNumber = cfg.getRunNumber();
38 // LOG(info) << "BK Filling run:" << runNumber;
39 // int runOri = runNumber;
40 // runNumber = 123;
41 if (start) {
42 for (auto const& cls : cntslast) {
43 for (int i = 0; i < 6; i++) {
44 cnts0[cls.first][i] = cls.second[i];
45 }
46 }
47 }
48 std::array<uint64_t, 6> cntsbk{0};
49 for (auto const& cls : cntslast) {
50 for (int i = 0; i < 6; i++) {
51 if (cls.second[i] < cntslast0[cls.first][i]) {
52 overflows[cls.first][i]++;
53 }
54 cntslast0[cls.first][i] = cls.second[i];
55 cntsbk[i] = (uint64_t)cls.second[i] + 0xffffffffull * overflows[cls.first][i] - (uint64_t)cnts0[cls.first][i];
56 }
57 std::string clsname = cfg.getClassNameFromHWIndex(cls.first);
58 // clsname = std::to_string(runOri) + "_" + clsname;
59 try {
60 BKClient->triggerCounters()->createOrUpdateForRun(runNumber, clsname, ts, cntsbk[0], cntsbk[1], cntsbk[2], cntsbk[3], cntsbk[4], cntsbk[5]);
61 } catch (std::runtime_error& error) {
62 std::cerr << "An error occurred: " << error.what() << std::endl;
63 return 1;
64 }
65 LOG(debug) << "Run BK:" << runNumber << " class:" << clsname << " cls.first" << cls.first << " ts:" << ts << " cnts:" << cntsbk[0] << " " << cntsbk[1] << " " << cntsbk[2] << " " << cntsbk[3] << " " << cntsbk[4] << " " << cntsbk[5];
66 }
67 return 0;
68}
73{
74 for (uint32_t i = 0; i < NRUNS; i++) {
75 mActiveRuns[i] = nullptr;
76 }
78 if (mBKHost != "none") {
79 mBKClient = BkpClientFactory::create(mBKHost);
80 LOG(info) << "BK Client created with:" << mBKHost;
81 } else {
82 LOG(info) << "BK not sent";
83 }
85 LOG(info) << "QC host:" << mQCDBHost;
86 LOG(info) << "QCDB writing every:" << mQCWritePeriod << " 10 secs";
87 LOG(info) << "CCDB host:" << mCCDBHost;
88 LOG(info) << "CTP vNew cfg:" << mNew;
89 LOG(info) << "CTPRunManager initialised.";
90}
91int CTPRunManager::loadRun(const std::string& cfg)
92{
93 LOG(info) << "Loading run: " << cfg;
94 auto now = std::chrono::system_clock::now();
95 long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
96 LOG(info) << "Timestamp real time:" << timeStamp;
97 size_t pos = cfg.find(" ");
98 std::string cfgmod = cfg;
99 if (pos == std::string::npos) {
100 LOG(warning) << "Space not found in CTP config";
101 } else {
102 std::string f = cfg.substr(0, pos);
103 if (f.find("run") == std::string::npos) {
104 double_t tt = std::stold(f);
105 timeStamp = (tt * 1000.);
106 LOG(info) << "Timestamp file:" << timeStamp;
107 cfgmod = cfg.substr(pos, cfg.size());
108 LOG(info) << "ctpcfg: using ctp time";
109 }
110 }
111 CTPActiveRun* activerun = new CTPActiveRun;
112 activerun->timeStart = timeStamp;
113 activerun->cfg.loadConfigurationRun3(cfgmod);
114 activerun->cfg.printStream(std::cout);
115 //
116 uint32_t runnumber = activerun->cfg.getRunNumber();
117 activerun->scalers.setRunNumber(runnumber);
118 activerun->scalers.setClassMask(activerun->cfg.getTriggerClassMask());
119 o2::detectors::DetID::mask_t detmask = activerun->cfg.getDetectorMask();
120 activerun->scalers.setDetectorMask(detmask);
121 //
122 mRunsLoaded[runnumber] = activerun;
123 saveRunConfigToCCDB(&activerun->cfg, timeStamp);
124
125 return 0;
126}
127int CTPRunManager::startRun(const std::string& cfg)
128{
129 return 0;
130}
131int CTPRunManager::stopRun(uint32_t irun, long timeStamp)
132{
133 LOG(info) << "Stopping run index: " << irun;
134 if (mActiveRuns[irun] == nullptr) {
135 LOG(error) << "No config for run index:" << irun;
136 return 1;
137 }
138 // const auto now = std::chrono::system_clock::now();
139 // const long timeStamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
140 mActiveRuns[irun]->timeStop = timeStamp * 1000.;
141 saveRunScalersToCCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
142 saveRunScalersToQCDB(mActiveRuns[irun]->scalers, mActiveRuns[irun]->timeStart, mActiveRuns[irun]->timeStop);
143 delete mActiveRuns[irun];
144 mActiveRuns[irun] = nullptr;
145 return 0;
146}
147int CTPRunManager::addScalers(uint32_t irun, std::time_t time, bool start)
148{
149 if (mActiveRuns[irun] == nullptr) {
150 LOG(error) << "No config for run index:" << irun;
151 return 1;
152 }
153 std::string orb = "extorb";
154 std::string orbitid = "orbitid";
155 CTPScalerRecordRaw scalrec;
156 scalrec.epochTime = time;
157 std::vector<int> clslist = mActiveRuns[irun]->cfg.getTriggerClassList();
158 for (auto const& cls : clslist) {
159 std::string cmb = "clamb" + std::to_string(cls + 1);
160 std::string cma = "clama" + std::to_string(cls + 1);
161 std::string c0b = "cla0b" + std::to_string(cls + 1);
162 std::string c0a = "cla0a" + std::to_string(cls + 1);
163 std::string c1b = "cla1b" + std::to_string(cls + 1);
164 std::string c1a = "cla1a" + std::to_string(cls + 1);
165 CTPScalerRaw scalraw;
166 scalraw.classIndex = (uint32_t)cls;
167 // std::cout << "cls:" << cls << " " << scalraw.classIndex << std::endl;
168 scalraw.lmBefore = mCounters[mScalerName2Position[cmb]];
169 scalraw.lmAfter = mCounters[mScalerName2Position[cma]];
170 scalraw.l0Before = mCounters[mScalerName2Position[c0b]];
171 scalraw.l0After = mCounters[mScalerName2Position[c0a]];
172 scalraw.l1Before = mCounters[mScalerName2Position[c1b]];
173 scalraw.l1After = mCounters[mScalerName2Position[c1a]];
174 // std::cout << "positions:" << cmb << " " << mScalerName2Position[cmb] << std::endl;
175 // std::cout << "positions:" << cma << " " << mScalerName2Position[cma] << std::endl;
176 scalrec.scalers.push_back(scalraw);
177 // BK scalers to be corrected for overflow
178 if (mBKClient) {
179 CTPActiveRun* ar = mActiveRuns[irun];
180 ar->cntslast[cls][0] = scalraw.lmBefore;
181 ar->cntslast[cls][1] = scalraw.lmAfter;
182 ar->cntslast[cls][2] = scalraw.l0Before;
183 ar->cntslast[cls][3] = scalraw.l0After;
184 ar->cntslast[cls][4] = scalraw.l1Before;
185 ar->cntslast[cls][5] = scalraw.l1After;
186 }
187 }
188 mActiveRuns[irun]->send2BK(mBKClient, time, start);
189 //
190 uint32_t NINPS = 48;
191 int offset = 599;
192 for (uint32_t i = 0; i < NINPS; i++) {
193 uint32_t inpcount = mCounters[offset + i];
194 scalrec.scalersInps.push_back(inpcount);
195 // LOG(info) << "Scaler for input:" << CTPRunScalers::scalerNames[offset+i] << ":" << inpcount;
196 }
197 //
198 if (mNew == 0) {
199 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orb]];
200 } else {
201 scalrec.intRecord.orbit = mCounters[mScalerName2Position[orbitid]];
202 }
203 scalrec.intRecord.bc = 0;
204 mActiveRuns[irun]->scalers.addScalerRacordRaw(scalrec);
205 LOG(info) << "Adding scalers for orbit:" << scalrec.intRecord.orbit;
206 // scalrec.printStream(std::cout);
207 // printCounters();
208 return 0;
209}
210int CTPRunManager::processMessage(std::string& topic, const std::string& message)
211{
212 LOG(info) << "Processing message with topic:" << topic;
213 std::string firstcounters;
214 if (topic.find("clear") != std::string::npos) {
215 mRunsLoaded.clear();
216 LOG(info) << "Loaded runs cleared.";
217 return 0;
218 }
219 if (topic.find("ctpconfig") != std::string::npos) {
220 LOG(info) << "ctpconfig received";
222 return 0;
223 }
224 if (topic.find("sox") != std::string::npos) {
225 // get config
226 size_t irun = message.find("run");
227 if (irun == std::string::npos) {
228 LOG(warning) << "run keyword not found in SOX";
229 irun = message.size();
230 }
231 LOG(info) << "SOX received, Run keyword position:" << irun;
232 std::string cfg = message.substr(irun, message.size() - irun);
233 startRun(cfg);
234 firstcounters = message.substr(0, irun);
235 }
236 if (topic.find("eox") != std::string::npos) {
237 LOG(info) << "EOX received";
238 mEOX = 1;
239 }
240 static int nerror = 0;
241 if (topic == "rocnts") {
242 if (nerror < 1) {
243 LOG(warning) << "Skipping topic rocnts";
244 nerror++;
245 }
246 return 0;
247 }
248 //
249 std::vector<std::string> tokens;
250 if (firstcounters.size() > 0) {
251 tokens = o2::utils::Str::tokenize(firstcounters, ' ');
252 } else {
253 tokens = o2::utils::Str::tokenize(message, ' ');
254 }
255 if (tokens.size() != (CTPRunScalers::NCOUNTERS + 1)) {
256 if (tokens.size() == (CTPRunScalers::NCOUNTERSv2 + 1)) {
257 mNew = 0;
258 LOG(warning) << "v2 scaler size";
259 } else {
260 LOG(warning) << "Scalers size wrong:" << tokens.size() << " expected:" << CTPRunScalers::NCOUNTERS + 1 << " or " << CTPRunScalers::NCOUNTERSv2 + 1;
261 return 1;
262 }
263 }
264 double timeStamp = std::stold(tokens.at(0));
265 std::time_t tt = timeStamp;
266 LOG(info) << "Processing scalers, all good, time:" << tokens.at(0) << " " << std::asctime(std::localtime(&tt));
267 for (uint32_t i = 1; i < tokens.size(); i++) {
268 mCounters[i - 1] = std::stoull(tokens.at(i));
269 if (i < (NRUNS + 1)) {
270 std::cout << mCounters[i - 1] << " ";
271 }
272 }
273 std::cout << std::endl;
274 LOG(info) << "Counter size:" << tokens.size();
275 //
276 for (uint32_t i = 0; i < NRUNS; i++) {
277 if ((mCounters[i] == 0) && (mActiveRunNumbers[i] == 0)) {
278 // not active
279 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == mCounters[i])) {
280 // active , do scalers
281 LOG(info) << "Run continue:" << mCounters[i];
282 addScalers(i, tt);
283 // LOG(info) << " QC period:" << mActiveRunNumbers[i] << " " << mActiveRuns[i]->qcwpcount << " " << mQCWritePeriod;
284 if (mActiveRuns[i]->qcwpcount > mQCWritePeriod) {
285 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
286 mActiveRuns[i]->qcwpcount = 0;
287 }
288 mActiveRuns[i]->qcwpcount++;
289 } else if ((mCounters[i] != 0) && (mActiveRunNumbers[i] == 0)) {
290 LOG(info) << "Run started:" << mCounters[i];
291 auto run = mRunsLoaded.find(mCounters[i]);
292 if (run != mRunsLoaded.end()) {
293 mActiveRunNumbers[i] = mCounters[i];
294 mActiveRuns[i] = run->second;
295 mRunsLoaded.erase(run);
296 addScalers(i, tt, 1);
297 saveRunScalersToQCDB(mActiveRuns[i]->scalers, tt * 1000, tt * 1000);
298 } else {
299 LOG(error) << "Trying to start run which is not loaded:" << mCounters[i];
300 }
301 } else if ((mCounters[i] == 0) && (mActiveRunNumbers[i] != 0)) {
302 if (mEOX != 1) {
303 LOG(error) << "Internal error in processMessage: mEOX != 1 expected 0: mEOX:" << mEOX;
304 }
305 LOG(info) << "Run stopped:" << mActiveRunNumbers[i];
306 addScalers(i, tt);
307 mActiveRunNumbers[i] = 0;
308 mEOX = 0;
309 stopRun(i, tt);
310 }
311 }
312 mEOX = 0;
314 return 0;
315}
317{
318 std::cout << "Active runs:";
319 for (auto const& arun : mActiveRunNumbers) {
320 std::cout << arun << " ";
321 }
322 std::cout << " #loaded runs:" << mRunsLoaded.size();
323 for (auto const& lrun : mRunsLoaded) {
324 std::cout << " " << lrun.second->cfg.getRunNumber();
325 }
326 std::cout << std::endl;
327}
329{
331 LOG(fatal) << "NCOUNTERS:" << CTPRunScalers::NCOUNTERS << " different from names vector:" << CTPRunScalers::scalerNames.size();
332 return 1;
333 }
334 // try to open files of no success use default
335 for (uint32_t i = 0; i < CTPRunScalers::scalerNames.size(); i++) {
336 mScalerName2Position[CTPRunScalers::scalerNames[i]] = i;
337 }
338 return 0;
339}
341{
342 int n = 0;
343 for (int i = 0; i < NRUNS; i++) {
344 if (mActiveRuns[i] != nullptr) {
345 n++;
346 }
347 }
348 return n;
349}
351{
352 int NDET = 18;
353 int NINPS = 48;
354 // int NCLKFP = 7;
355 int NLTG_start = NRUNS;
356 int NCLKFP_start = NLTG_start + NDET * 32;
357 int NINPS_start = NCLKFP_start + 7;
358 int NCLS_start = NINPS_start + NINPS;
359 std::cout << "====> CTP counters:" << std::endl;
360 std::cout << "RUNS:" << std::endl;
361 int ipos = 0;
362 for (uint32_t i = 0; i < NRUNS; i++) {
363 std::cout << ipos << ":" << mCounters[i] << " ";
364 ipos++;
365 }
366 std::cout << std::endl;
367 for (int i = 0; i < NDET; i++) {
368 std::cout << "LTG" << i + 1 << std::endl;
369 for (int j = NLTG_start + i * 32; j < NLTG_start + (i + 1) * 32; j++) {
370 std::cout << ipos << ":" << mCounters[j] << " ";
371 ipos++;
372 }
373 std::cout << std::endl;
374 }
375 std::cout << "BC40,BC240,Orbit,pulser, fastlm, busy,spare" << std::endl;
376 for (int i = NCLKFP_start; i < NCLKFP_start + 7; i++) {
377 std::cout << ipos << ":" << mCounters[i] << " ";
378 ipos++;
379 }
380 std::cout << std::endl;
381 std::cout << "INPUTS:" << std::endl;
382 for (int i = NINPS_start; i < NINPS_start + NINPS; i++) {
383 std::cout << ipos << ":" << mCounters[i] << " ";
384 ipos++;
385 }
386 std::cout << std::endl;
387 std::cout << "CLASS M Before" << std::endl;
388 for (int i = NCLS_start; i < NCLS_start + 64; i++) {
389 std::cout << ipos << ":" << mCounters[i] << " ";
390 ipos++;
391 }
392 std::cout << std::endl;
393 std::cout << "CLASS M After" << std::endl;
394 for (int i = NCLS_start + 64; i < NCLS_start + 2 * 64; i++) {
395 std::cout << ipos << ":" << mCounters[i] << " ";
396 ipos++;
397 }
398 std::cout << std::endl;
399 std::cout << "CLASS 0 Before" << std::endl;
400 for (int i = NCLS_start + 2 * 64; i < NCLS_start + 3 * 64; i++) {
401 std::cout << ipos << ":" << mCounters[i] << " ";
402 ipos++;
403 }
404 std::cout << std::endl;
405 std::cout << "CLASS 0 After" << std::endl;
406 for (int i = NCLS_start + 3 * 64; i < NCLS_start + 4 * 64; i++) {
407 std::cout << ipos << ":" << mCounters[i] << " ";
408 ipos++;
409 }
410 std::cout << std::endl;
411 std::cout << "CLASS 1 Before" << std::endl;
412 for (int i = NCLS_start + 4 * 64; i < NCLS_start + 5 * 64; i++) {
413 std::cout << ipos << ":" << mCounters[i] << " ";
414 ipos++;
415 }
416 std::cout << std::endl;
417 std::cout << "CLASS 1 After" << std::endl;
418 for (int i = NCLS_start + 5 * 64; i < NCLS_start + 6 * 64; i++) {
419 std::cout << ipos << ":" << mCounters[i] << " ";
420 ipos++;
421 }
422 std::cout << std::endl;
423 std::cout << " REST:" << std::endl;
424 for (uint32_t i = NCLS_start + 6 * 64; i < mCounters.size(); i++) {
425 if ((ipos % 10) == 0) {
426 std::cout << std::endl;
427 std::cout << ipos << ":";
428 }
429 std::cout << mCounters[i] << " ";
430 }
431}
int16_t time
Definition RawEventData.h:4
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t j
Definition RawData.h:0
Managing runs for config and scalers.
std::ostringstream debug
std::vector< int > getTriggerClassList() const
uint64_t getTriggerClassMask() const
std::string getClassNameFromHWIndex(int index)
o2::detectors::DetID::mask_t getDetectorMask() const
void printStream(std::ostream &stream) const
int loadConfigurationRun3(const std::string &ctpconfiguartion)
int stopRun(uint32_t irun, long timeStamp)
int startRun(const std::string &cfg)
void printActiveRuns() const
int processMessage(std::string &topic, const std::string &message)
int loadRun(const std::string &cfg)
int addScalers(uint32_t irun, std::time_t time, bool start=0)
static constexpr uint32_t NCOUNTERS
Definition Scalers.h:96
static constexpr uint32_t NCOUNTERSv2
Definition Scalers.h:95
void setDetectorMask(o2::detectors::DetID::mask_t mask)
Definition Scalers.h:114
static std::vector< std::string > scalerNames
Definition Scalers.h:97
void setRunNumber(uint32_t rnumber)
Definition Scalers.h:115
void setClassMask(std::bitset< CTP_NCLASSES > classMask)
Definition Scalers.h:113
static std::string mCCDBHost
Database constants.
int saveRunScalersToQCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
static std::string mQCDBHost
static void setQCDBHost(std::string host)
int saveRunScalersToCCDB(CTPRunScalers &scalers, long timeStart, long timeStop)
int saveRunConfigToCCDB(CTPConfiguration *cfg, long timeStart)
GLdouble n
Definition glcorearb.h:1982
GLsizeiptr size
Definition glcorearb.h:659
GLdouble f
Definition glcorearb.h:310
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLuint start
Definition glcorearb.h:469
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
uint32_t orbit
LHC orbit.
uint16_t bc
bunch crossing ID of interaction
counters64_t overflows
Definition RunManager.h:40
CTPConfiguration cfg
Definition RunManager.h:32
CTPRunScalers scalers
Definition RunManager.h:33
int send2BK(std::unique_ptr< BkpClient > &BKClient, size_t ts, bool start)
uint32_t classIndex
Definition Scalers.h:41
std::vector< CTPScalerRaw > scalers
Definition Scalers.h:70
o2::InteractionRecord intRecord
Definition Scalers.h:68
std::vector< uint32_t > scalersInps
Definition Scalers.h:72
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"