12#ifndef O2_TOF_DATAPROCESSOR_H
13#define O2_TOF_DATAPROCESSOR_H
20#include <TStopwatch.h>
51using Duration = std::chrono::duration<double, std::ratio<1, 1>>;
59 std::vector<DPID> vect;
60 mDPsUpdateInterval = ic.options().get<int64_t>(
"DPs-update-interval");
61 if (mDPsUpdateInterval == 0) {
62 LOG(error) <<
"TOF DPs update interval set to zero seconds --> changed to 60";
63 mDPsUpdateInterval = 60;
65 bool useCCDBtoConfigure = ic.options().get<
bool>(
"use-ccdb-to-configure");
66 if (useCCDBtoConfigure) {
67 LOG(info) <<
"Configuring via CCDB";
68 std::string ccdbpath = ic.options().get<std::string>(
"ccdb-path");
71 long ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
72 std::unordered_map<DPID, std::string>* dpid2DataDesc = mgr.getForTimeStamp<std::unordered_map<DPID, std::string>>(
"TOF/Config/DCSDPconfig", ts);
73 for (
auto&
i : *dpid2DataDesc) {
74 vect.push_back(
i.first);
77 LOG(info) <<
"Configuring via hardcoded strings";
78 std::vector<std::string> aliases = {
"tof_hv_vp_[00..89]",
"tof_hv_vn_[00..89]",
"tof_hv_ip_[00..89]",
"tof_hv_in_[00..89]"};
79 std::vector<std::string> aliasesInt = {
"TOF_FEACSTATUS_[00..71]"};
82 for (
const auto&
i : expaliases) {
85 for (
const auto&
i : expaliasesInt) {
90 LOG(info) <<
"Listing Data Points for TOF:";
91 for (
auto&
i : vect) {
95 mProcessor = std::make_unique<o2::tof::TOFDCSProcessor>();
96 mVerboseModeDPs = ic.options().get<
bool>(
"use-verbose-mode-DP");
97 mVerboseModeHVLV = ic.options().get<
bool>(
"use-verbose-mode-HVLV");
98 LOG(info) <<
" ************************* Verbose DP? " << mVerboseModeDPs;
99 LOG(info) <<
" ************************* Verbose HV/LV? " << mVerboseModeHVLV;
100 if (mVerboseModeDPs) {
101 mProcessor->useVerboseModeDP();
103 if (mVerboseModeHVLV) {
104 mProcessor->useVerboseModeHVLV();
106 mProcessor->init(vect);
107 mTimer = HighResClock::now();
108 mReportTiming = ic.options().get<
bool>(
"report-timing") || mVerboseModeDPs || mVerboseModeHVLV;
109 mStoreWhenAllDPs = ic.options().get<
bool>(
"store-when-all-DPs-filled");
115 auto timeNow = HighResClock::now();
118 if (dataTime == 0xffffffffffffffff) {
119 dataTime = std::chrono::duration_cast<std::chrono::milliseconds>(timeNow.time_since_epoch()).count();
122 if (mVerboseModeDPs) {
123 LOG(info) <<
"startValidity for DPs changed to = " << dataTime;
125 mProcessor->setStartValidityDPs(dataTime);
128 if (mVerboseModeHVLV) {
129 LOG(info) <<
"startValidity for LV changed to = " << dataTime;
131 mProcessor->setStartValidityLV(dataTime);
134 if (mVerboseModeHVLV) {
135 LOG(info) <<
"startValidity for HV changed to = " << dataTime;
137 mProcessor->setStartValidityHV(dataTime);
139 auto dps = pc.inputs().get<gsl::span<DPCOM>>(
"input");
141 mProcessor->process(dps);
142 Duration elapsedTime = timeNow - mTimer;
143 if (elapsedTime.count() >= mDPsUpdateInterval) {
144 bool sendToCCDB =
true;
145 if (mStoreWhenAllDPs) {
146 sendToCCDB = mProcessor->areAllDPsFilled();
149 sendDPsoutput(pc.outputs());
152 LOG(
debug) <<
"Not sending yet: mStoreWhenAllDPs = " << mStoreWhenAllDPs <<
", mProcessor->areAllDPsFilled() = " << mProcessor->areAllDPsFilled() <<
", sentToCCDB = " << sendToCCDB;
155 sendLVandHVoutput(pc.outputs());
164 if (!mProcessor->areAllDPsFilled()) {
165 LOG(
debug) <<
"Not all DPs are filled, sending to CCDB what we have anyway";
167 sendDPsoutput(ec.outputs());
168 sendLVandHVoutput(ec.outputs());
172 bool mReportTiming =
false;
173 std::unique_ptr<TOFDCSProcessor> mProcessor;
174 HighResClock::time_point mTimer;
175 int64_t mDPsUpdateInterval;
176 bool mStoreWhenAllDPs =
false;
177 bool mVerboseModeDPs =
false;
178 bool mVerboseModeHVLV =
false;
184 mProcessor->updateDPsCCDB();
185 const auto& payload = mProcessor->getTOFDPsInfo();
186 auto& info = mProcessor->getccdbDPsInfo();
188 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
189 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
192 mProcessor->clearDPsinfo();
193 mProcessor->resetStartValidityDPs();
201 if (mProcessor->isLVUpdated()) {
202 const auto& payload = mProcessor->getLVStatus();
203 auto& info = mProcessor->getccdbLVInfo();
205 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
206 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
210 mProcessor->resetStartValidityLV();
212 if (mProcessor->isHVUpdated()) {
213 const auto& payload = mProcessor->getHVStatus();
214 auto& info = mProcessor->getccdbHVInfo();
216 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
217 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
220 mProcessor->resetStartValidityHV();
235 std::vector<OutputSpec> outputs;
246 "tof-dcs-data-processor",
247 Inputs{{
"input",
"DCS",
"TOFDATAPOINTS"}},
249 AlgorithmSpec{adaptFromTask<o2::tof::TOFDCSDataProcessor>()},
253 {
"use-verbose-mode-HVLV",
VariantType::Bool,
false, {
"Use verbose mode for HV and LV"}},
255 {
"DPs-update-interval",
VariantType::Int64, 600ll, {
"Interval (in s) after which to update the DPs CCDB entry"}},
256 {
"store-when-all-DPs-filled",
VariantType::Bool,
false, {
"Store CCDB entry only when all DPs have been filled (--> never re-use an old value)"}}}};
Utils and constants for calibration and related workflows.
static BasicCCDBManager & instance()
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static constexpr long INFINITE_TIMESTAMP
void init(o2::framework::InitContext &ic) final
void run(o2::framework::ProcessingContext &pc) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
std::vector< std::string > expandAliases(const std::vector< std::string > &patternedAliases)
Defining PrimaryVertex explicitly as messageable.
std::vector< InputSpec > Inputs
std::chrono::high_resolution_clock HighResClock
std::chrono::duration< double, std::ratio< 1, 1 > > Duration
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"