12#ifndef O2_EMCAL_DATAPROCESSORSPEC_H
13#define O2_EMCAL_DATAPROCESSORSPEC_H
20#include <TStopwatch.h>
54using Duration = std::chrono::duration<double, std::ratio<1, 1>>;
62 std::string ccdbpath = ic.options().get<std::string>(
"ccdb-path");
66 mCheckRunStartStop = ic.options().get<
bool>(
"follow-emcal-run");
68 std::vector<DPID> vect;
69 mDPsUpdateInterval = ic.options().get<int64_t>(
"DPs-update-interval");
70 if (mDPsUpdateInterval == 0) {
71 LOG(error) <<
"EMC DPs update interval set to zero seconds --> changed to 60";
72 mDPsUpdateInterval = 60;
74 bool useCCDBtoConfigure = ic.options().get<
bool>(
"use-ccdb-to-configure");
75 if (useCCDBtoConfigure) {
76 LOG(info) <<
"Configuring via CCDB";
77 long ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
78 std::unordered_map<DPID, std::string>* dpid2DataDesc = mgr.getForTimeStamp<std::unordered_map<DPID, std::string>>(
"EMC/Config/DCSDPconfig", ts);
79 for (
auto&
i : *dpid2DataDesc) {
80 vect.push_back(
i.first);
83 LOG(info) <<
"Configuring via hardcoded strings";
85 std::vector<std::string> aliasesTEMP = {
"EMC_PT_[00..83].Temperature",
"EMC_PT_[88..91].Temperature",
"EMC_PT_[96..159].Temperature"};
86 std::vector<std::string> aliasesUINT = {
"EMC_DDL_LIST[0..1]",
"EMC_SRU[00..19]_CFG",
"EMC_SRU[00..19]_FMVER",
87 "EMC_TRU[00..45]_PEAKFINDER",
"EMC_TRU[00..45]_L0ALGSEL",
"EMC_TRU[00..45]_COSMTHRESH",
88 "EMC_TRU[00..45]_GLOBALTHRESH",
"EMC_TRU[00..45]_MASK[0..5]",
89 "EMC_STU_ERROR_COUNT_TRU[0..67]",
"DMC_STU_ERROR_COUNT_TRU[0..55]"};
90 std::vector<std::string> aliasesINT = {
"EMC_STU_FWVERS",
"EMC_STU_GA[0..1]",
"EMC_STU_GB[0..1]",
"EMC_STU_GC[0..1]",
91 "EMC_STU_JA[0..1]",
"EMC_STU_JB[0..1]",
"EMC_STU_JC[0..1]",
"EMC_STU_PATCHSIZE",
"EMC_STU_GETRAW",
92 "EMC_STU_MEDIAN",
"EMC_STU_REGION",
"DMC_STU_FWVERS",
"DMC_STU_PHOS_scale[0..3]",
"DMC_STU_GA[0..1]",
93 "DMC_STU_GB[0..1]",
"DMC_STU_GC[0..1]",
"DMC_STU_JA[0..1]",
"DMC_STU_JB[0..1]",
"DMC_STU_JC[0..1]",
94 "DMC_STU_PATCHSIZE",
"DMC_STU_GETRAW",
"DMC_STU_MEDIAN",
"DMC_STU_REGION",
"EMC_RUNNUMBER"};
100 for (
const auto&
i : expaliasesTEMP) {
103 for (
const auto&
i : expaliasesINT) {
106 for (
const auto&
i : expaliasesUINT) {
111 LOG(info) <<
"Listing Data Points for EMC:";
112 for (
auto&
i : vect) {
116 mProcessor = std::make_unique<o2::emcal::EMCDCSProcessor>();
117 bool useVerboseMode = ic.options().get<
bool>(
"use-verbose-mode");
118 LOG(info) <<
" ************************* Verbose? " << useVerboseMode;
119 if (useVerboseMode) {
120 mProcessor->useVerboseMode();
122 mProcessor->init(vect);
123 mTimer = HighResClock::now();
124 mReportTiming = ic.options().get<
bool>(
"report-timing") || useVerboseMode;
130 static size_t runCount = 0;
131 const size_t logPrescale = 10;
132 if (mCheckRunStartStop) {
133 const auto* grp = mRunChecker.
check();
136 if ((runCount % logPrescale) == 0) {
137 LOGP(info,
"No run with is ongoing or finished");
139 }
else if (mRunChecker.
getRunStatus() == RunStatus::START) {
143 }
else if (mRunChecker.
getRunStatus() == RunStatus::ONGOING) {
144 if ((runCount % logPrescale) == 0) {
145 LOGP(info,
"Run {} is still ongoing", mRunChecker.
getFollowedRun());
147 }
else if (mRunChecker.
getRunStatus() == RunStatus::STOP) {
152 mProcessor->setRunNumberFromGRP(-2);
156 auto tfid = o2::header::get<o2::framework::DataProcessingHeader*>(pc.inputs().get(
"input").header)->creation;
157 auto dps = pc.inputs().get<gsl::span<DPCOM>>(
"input");
158 mProcessor->setTF(tfid);
159 mProcessor->process(dps);
160 auto timeNow = HighResClock::now();
161 Duration elapsedTime = timeNow - mTimer;
162 if (elapsedTime.count() >= mDPsUpdateInterval) {
163 sendELMButput(pc.outputs());
166 if ((mCheckRunStartStop && (mRunChecker.
getRunStatus() == RunStatus::START)) || (!mCheckRunStartStop && mProcessor->isUpdateFEEcfg())) {
167 sendCFGoutput(pc.outputs());
177 sendELMButput(ec.outputs());
182 bool mReportTiming{
false};
183 std::unique_ptr<EMCDCSProcessor> mProcessor;
184 HighResClock::time_point mTimer;
185 int64_t mDPsUpdateInterval;
186 bool mCheckRunStartStop =
false;
194 mProcessor->processElmb();
195 mProcessor->updateElmbCCDBinfo();
197 const auto& payload = mProcessor->getELMBdata();
198 auto& info = mProcessor->getccdbELMBinfo();
200 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
201 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
212 mProcessor->updateFeeCCDBinfo();
214 const auto& payload = mProcessor->getFeeDCSdata();
215 auto& info = mProcessor->getccdbFeeDCSinfo();
217 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
218 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
236 std::vector<OutputSpec> outputs;
244 "emc-dcs-data-processor",
245 Inputs{{
"input",
"DCS",
"EMCDATAPOINTS"}},
247 AlgorithmSpec{adaptFromTask<o2::emcal::EMCALDCSDataProcessor>()},
253 {
"DPs-update-interval",
VariantType::Int64, 600ll, {
"Interval (in s) after which to update the DPs CCDB entry"}}}};
Utils and constants for calibration and related workflows.
Header of the AggregatedRunInfo struct.
Definition of the Names Generator class.
static std::string getCCDBServer()
static BasicCCDBManager & instance()
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
RunStatus getRunStatus() const
const o2::parameters::GRPECSObject * check(long ts=-1)
int getFollowedRun() const
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
std::vector< std::string > expandAliases(const std::vector< std::string > &patternedAliases)
std::chrono::high_resolution_clock HighResClock
std::chrono::duration< double, std::ratio< 1, 1 > > Duration
Defining PrimaryVertex explicitly as messageable.
std::vector< InputSpec > Inputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"