Project
Loading...
Searching...
No Matches
TOFDCSDataProcessorSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_TOF_DATAPROCESSOR_H
13#define O2_TOF_DATAPROCESSOR_H
14
17
18#include <unistd.h>
19#include <TRandom.h>
20#include <TStopwatch.h>
28#include "CCDB/CcdbApi.h"
34#include "Framework/Task.h"
35#include "Framework/Logger.h"
36
37using namespace o2::framework;
38
39namespace o2
40{
41namespace tof
42{
43
47using namespace o2::ccdb;
50using HighResClock = std::chrono::high_resolution_clock;
51using Duration = std::chrono::duration<double, std::ratio<1, 1>>;
52
54{
55 public:
57 {
58
59 std::vector<DPID> vect;
60 mDPsUpdateInterval = ic.options().get<int64_t>("DPs-update-interval");
61 if (mDPsUpdateInterval == 0) {
62 LOG(error) << "TOF DPs update interval set to zero seconds --> changed to 60";
63 mDPsUpdateInterval = 60;
64 }
65 bool useCCDBtoConfigure = ic.options().get<bool>("use-ccdb-to-configure");
66 if (useCCDBtoConfigure) {
67 LOG(info) << "Configuring via CCDB";
68 std::string ccdbpath = ic.options().get<std::string>("ccdb-path");
69 auto& mgr = CcdbManager::instance();
70 mgr.setURL(ccdbpath);
71 long ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
72 std::unordered_map<DPID, std::string>* dpid2DataDesc = mgr.getForTimeStamp<std::unordered_map<DPID, std::string>>("TOF/Config/DCSDPconfig", ts);
73 for (auto& i : *dpid2DataDesc) {
74 vect.push_back(i.first);
75 }
76 } else {
77 LOG(info) << "Configuring via hardcoded strings";
78 std::vector<std::string> aliases = {"tof_hv_vp_[00..89]", "tof_hv_vn_[00..89]", "tof_hv_ip_[00..89]", "tof_hv_in_[00..89]"};
79 std::vector<std::string> aliasesInt = {"TOF_FEACSTATUS_[00..71]"};
80 std::vector<std::string> expaliases = o2::dcs::expandAliases(aliases);
81 std::vector<std::string> expaliasesInt = o2::dcs::expandAliases(aliasesInt);
82 for (const auto& i : expaliases) {
83 vect.emplace_back(i, o2::dcs::DPVAL_DOUBLE);
84 }
85 for (const auto& i : expaliasesInt) {
86 vect.emplace_back(i, o2::dcs::DPVAL_INT);
87 }
88 }
89
90 LOG(info) << "Listing Data Points for TOF:";
91 for (auto& i : vect) {
92 LOG(info) << i;
93 }
94
95 mProcessor = std::make_unique<o2::tof::TOFDCSProcessor>();
96 mVerboseModeDPs = ic.options().get<bool>("use-verbose-mode-DP");
97 mVerboseModeHVLV = ic.options().get<bool>("use-verbose-mode-HVLV");
98 LOG(info) << " ************************* Verbose DP? " << mVerboseModeDPs;
99 LOG(info) << " ************************* Verbose HV/LV? " << mVerboseModeHVLV;
100 if (mVerboseModeDPs) {
101 mProcessor->useVerboseModeDP();
102 }
103 if (mVerboseModeHVLV) {
104 mProcessor->useVerboseModeHVLV();
105 }
106 mProcessor->init(vect);
107 mTimer = HighResClock::now();
108 mReportTiming = ic.options().get<bool>("report-timing") || mVerboseModeDPs || mVerboseModeHVLV;
109 mStoreWhenAllDPs = ic.options().get<bool>("store-when-all-DPs-filled");
110 }
111
113 {
114 TStopwatch sw;
115 auto timeNow = HighResClock::now();
116
117 long dataTime = (long)(pc.services().get<o2::framework::TimingInfo>().creation);
118 if (dataTime == 0xffffffffffffffff) { // it means it is not set
119 dataTime = std::chrono::duration_cast<std::chrono::milliseconds>(timeNow.time_since_epoch()).count(); // in ms
120 }
121 if (mProcessor->getStartValidityDPs() == o2::ccdb::CcdbObjectInfo::INFINITE_TIMESTAMP) {
122 if (mVerboseModeDPs) {
123 LOG(info) << "startValidity for DPs changed to = " << dataTime;
124 }
125 mProcessor->setStartValidityDPs(dataTime);
126 }
127 if (mProcessor->getStartValidityLV() == o2::ccdb::CcdbObjectInfo::INFINITE_TIMESTAMP) {
128 if (mVerboseModeHVLV) {
129 LOG(info) << "startValidity for LV changed to = " << dataTime;
130 }
131 mProcessor->setStartValidityLV(dataTime);
132 }
133 if (mProcessor->getStartValidityHV() == o2::ccdb::CcdbObjectInfo::INFINITE_TIMESTAMP) {
134 if (mVerboseModeHVLV) {
135 LOG(info) << "startValidity for HV changed to = " << dataTime;
136 }
137 mProcessor->setStartValidityHV(dataTime);
138 }
139 auto dps = pc.inputs().get<gsl::span<DPCOM>>("input");
140
141 mProcessor->process(dps);
142 Duration elapsedTime = timeNow - mTimer; // in seconds
143 if (elapsedTime.count() >= mDPsUpdateInterval) {
144 bool sendToCCDB = true;
145 if (mStoreWhenAllDPs) {
146 sendToCCDB = mProcessor->areAllDPsFilled();
147 }
148 if (sendToCCDB) {
149 sendDPsoutput(pc.outputs());
150 mTimer = timeNow;
151 } else {
152 LOG(debug) << "Not sending yet: mStoreWhenAllDPs = " << mStoreWhenAllDPs << ", mProcessor->areAllDPsFilled() = " << mProcessor->areAllDPsFilled() << ", sentToCCDB = " << sendToCCDB;
153 }
154 }
155 sendLVandHVoutput(pc.outputs());
156 sw.Stop();
157 if (mReportTiming) {
158 LOGP(info, "Timing CPU:{:.3e} Real:{:.3e} at slice {}", sw.CpuTime(), sw.RealTime(), pc.services().get<o2::framework::TimingInfo>().timeslice);
159 }
160 }
161
163 {
164 if (!mProcessor->areAllDPsFilled()) {
165 LOG(debug) << "Not all DPs are filled, sending to CCDB what we have anyway";
166 }
167 sendDPsoutput(ec.outputs());
168 sendLVandHVoutput(ec.outputs());
169 }
170
171 private:
172 bool mReportTiming = false;
173 std::unique_ptr<TOFDCSProcessor> mProcessor;
174 HighResClock::time_point mTimer;
175 int64_t mDPsUpdateInterval;
176 bool mStoreWhenAllDPs = false;
177 bool mVerboseModeDPs = false;
178 bool mVerboseModeHVLV = false;
179
180 //________________________________________________________________
181 void sendDPsoutput(DataAllocator& output)
182 {
183 // extract CCDB infos and calibration object for DPs
184 mProcessor->updateDPsCCDB();
185 const auto& payload = mProcessor->getTOFDPsInfo();
186 auto& info = mProcessor->getccdbDPsInfo();
187 auto image = o2::ccdb::CcdbApi::createObjectImage(&payload, &info);
188 LOG(info) << "Sending object " << info.getPath() << "/" << info.getFileName() << " of size " << image->size()
189 << " bytes, valid for " << info.getStartValidityTimestamp() << " : " << info.getEndValidityTimestamp();
190 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, "TOF_DCSDPs", 0}, *image.get());
191 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, "TOF_DCSDPs", 0}, info);
192 mProcessor->clearDPsinfo();
193 mProcessor->resetStartValidityDPs();
194 }
195
196 //________________________________________________________________
197 void sendLVandHVoutput(DataAllocator& output)
198 {
199 // extract CCDB infos and calibration objects for LV and HV, convert it to TMemFile and send them to the output
200
201 if (mProcessor->isLVUpdated()) {
202 const auto& payload = mProcessor->getLVStatus();
203 auto& info = mProcessor->getccdbLVInfo();
204 auto image = o2::ccdb::CcdbApi::createObjectImage(&payload, &info);
205 LOG(info) << "Sending object " << info.getPath() << "/" << info.getFileName() << " of size " << image->size()
206 << " bytes, valid for " << info.getStartValidityTimestamp() << " : " << info.getEndValidityTimestamp();
207
208 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, "TOF_LVStatus", 0}, *image.get());
209 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, "TOF_LVStatus", 0}, info);
210 mProcessor->resetStartValidityLV();
211 }
212 if (mProcessor->isHVUpdated()) {
213 const auto& payload = mProcessor->getHVStatus();
214 auto& info = mProcessor->getccdbHVInfo();
215 auto image = o2::ccdb::CcdbApi::createObjectImage(&payload, &info);
216 LOG(info) << "Sending object " << info.getPath() << "/" << info.getFileName() << " of size " << image->size()
217 << " bytes, valid for " << info.getStartValidityTimestamp() << " : " << info.getEndValidityTimestamp();
218 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, "TOF_HVStatus", 0}, *image.get());
219 output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, "TOF_HVStatus", 0}, info);
220 mProcessor->resetStartValidityHV();
221 }
222 }
223
224}; // end class
225} // namespace tof
226
227namespace framework
228{
229
230DataProcessorSpec getTOFDCSDataProcessorSpec()
231{
232
234
235 std::vector<OutputSpec> outputs;
236 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, "TOF_LVStatus"}, Lifetime::Sporadic);
237 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, "TOF_LVStatus"}, Lifetime::Sporadic);
238
239 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, "TOF_HVStatus"}, Lifetime::Sporadic);
240 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, "TOF_HVStatus"}, Lifetime::Sporadic);
241
242 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, "TOF_DCSDPs"}, Lifetime::Sporadic);
243 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, "TOF_DCSDPs"}, Lifetime::Sporadic);
244
245 return DataProcessorSpec{
246 "tof-dcs-data-processor",
247 Inputs{{"input", "DCS", "TOFDATAPOINTS"}},
248 outputs,
249 AlgorithmSpec{adaptFromTask<o2::tof::TOFDCSDataProcessor>()},
250 Options{{"ccdb-path", VariantType::String, "http://localhost:8080", {"Path to CCDB"}},
251 {"use-ccdb-to-configure", VariantType::Bool, false, {"Use CCDB to configure"}},
252 {"use-verbose-mode-DP", VariantType::Bool, false, {"Use verbose mode for DPs"}},
253 {"use-verbose-mode-HVLV", VariantType::Bool, false, {"Use verbose mode for HV and LV"}},
254 {"report-timing", VariantType::Bool, false, {"Report timing for every slice"}},
255 {"DPs-update-interval", VariantType::Int64, 600ll, {"Interval (in s) after which to update the DPs CCDB entry"}},
256 {"store-when-all-DPs-filled", VariantType::Bool, false, {"Store CCDB entry only when all DPs have been filled (--> never re-use an old value)"}}}};
257}
258
259} // namespace framework
260} // namespace o2
261
262#endif
Utils and constants for calibration and related workflows.
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
static BasicCCDBManager & instance()
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
static constexpr long INFINITE_TIMESTAMP
void init(o2::framework::InitContext &ic) final
void run(o2::framework::ProcessingContext &pc) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
GLeglImageOES image
Definition glcorearb.h:4021
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
std::vector< std::string > expandAliases(const std::vector< std::string > &patternedAliases)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > Inputs
std::chrono::high_resolution_clock HighResClock
std::chrono::duration< double, std::ratio< 1, 1 > > Duration
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
TStopwatch sw