Project
Loading...
Searching...
No Matches
dcs-processor-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "CCDB/CcdbApi.h"
13#include "CCDB/CcdbObjectInfo.h"
22#include "subsysname.h"
23#include <array>
24#include <chrono>
25#include <gsl/span>
26#include <iostream>
27#include <unordered_map>
28#include <vector>
29#include <TStopwatch.h>
31#if defined(MUON_SUBSYSTEM_MCH)
33#define CCDBOBJ "MCH_DPS"
34#endif
35#if defined(MUON_SUBSYSTEM_MID)
37#define CCDBOBJ "MID_DPS"
38#endif
39
40namespace
41{
42
43using DPID = o2::dcs::DataPointIdentifier; // aka alias name
45using DPMAP = std::unordered_map<DPID, std::vector<DPVAL>>;
46
47using namespace o2::calibration;
48
49/*
50 * Create a default CCDB Object Info that will be used as a template.
51 *
52 * @param path describes the CCDB data path used (e.g. MCH/LV or MID/HV)
53 *
54 * The start and end validity times are supposed to be updated from this template,
55 * as well as the metadata (if needed). The rest of the information should
56 * be taken as is.
57 */
58o2::ccdb::CcdbObjectInfo createDefaultInfo(const char* path)
59{
60 DPMAP obj;
62 auto flName = o2::ccdb::CcdbApi::generateFileName(clName);
64 info.setPath(path);
65 info.setObjectType(clName);
66 info.setFileName(flName);
69 std::map<std::string, std::string> md;
70 info.setMetaData(md);
71 return info;
72}
73
74#if defined(MUON_SUBSYSTEM_MCH)
75#define NOBJECTS 2
76std::array<o2::ccdb::CcdbObjectInfo, NOBJECTS> info{createDefaultInfo("MCH/Calib/HV"), createDefaultInfo("MCH/Calib/LV")};
77std::array<uint64_t, NOBJECTS> t0 = {0, 0};
78#elif defined(MUON_SUBSYSTEM_MID)
79#define NOBJECTS 1
80std::array<o2::ccdb::CcdbObjectInfo, NOBJECTS> info{createDefaultInfo("MID/Calib/HV")};
81std::array<uint64_t, NOBJECTS> t0 = {0};
82#endif
83
84std::array<DPMAP, NOBJECTS> dataPoints;
85
86/*
87 * Return the data point values with min and max timestamps
88 */
89std::pair<DPVAL, DPVAL> computeTimeRange(const DPMAP& dpmap)
90{
91 DPVAL dmin, dmax;
92 uint64_t minTime{std::numeric_limits<uint64_t>::max()};
93 uint64_t maxTime{0};
94
95 for (auto did : dpmap) {
96 for (auto d : did.second) {
97 const auto ts = d.get_epoch_time();
98 if (ts < minTime) {
99 dmin = d;
100 minTime = ts;
101 }
102 if (ts > maxTime) {
103 dmax = d;
104 maxTime = ts;
105 }
106 }
107 }
108 return std::make_pair(dmin, dmax);
109}
110
111/*
112 * Compute the (approximate) size (in KB) of a dpmap.
113 */
114size_t computeSize(const DPMAP& dpmap)
115{
116 constexpr int itemSize = 64; // DataPointIdentifier or DataPointValue have the same size = 64 bytes
117 constexpr float byte2KB = 1.0 / 1024;
118
119 size_t nofItems = 0;
120 for (auto did : dpmap) {
121 nofItems++; // +1 for the DataPointIdentifier itself
122 nofItems += did.second.size(); // +number of DataPointValues
123 }
124 return static_cast<size_t>(std::floor(nofItems * itemSize * byte2KB));
125}
126
127/*
128 * Compute the duration (in seconds) span by the datapoints in the dpmap.
129 */
130int computeDuration(const DPMAP& dpmap)
131{
132 auto range = computeTimeRange(dpmap);
133 return static_cast<int>((range.second.get_epoch_time() - range.first.get_epoch_time()) / 1000);
134}
135
136/*
137 * Send a DPMAP to the output.
138 *
139 * @param dpmap a map of string to vector of DataPointValue
140 * @param output a DPL data allocator
141 * @param info a CCDB object info describing the dpmap
142 * @param reason (optional, can be empty) a string description why the dpmap
143 * was ready to be shipped (e.g. big enough, long enough, end of process, etc...)
144 */
145void sendOutput(const DPMAP& dpmap,
148 const std::string& reason,
149 uint64_t startOfValidity)
150{
151 if (dpmap.empty()) {
152 // we do _not_ write empty objects
153 return;
154 }
155
156 info.setStartValidityTimestamp(startOfValidity);
157 info.setEndValidityTimestamp(startOfValidity + 5 * o2::ccdb::CcdbObjectInfo::DAY);
158
159 auto md = info.getMetaData();
160 md["upload-reason"] = reason;
161 md["nof-datapoints"] = fmt::format("{}", dpmap.size());
162 size_t nofValues = 0;
163 for (auto did : dpmap) {
164 nofValues += did.second.size();
165 }
166 md["nof-datapoint-values"] = fmt::format("{}", nofValues);
167
168 auto range = computeTimeRange(dpmap);
169 md["datapoint-value-first-time"] = range.first.get_timestamp()->c_str();
170 md["datapoint-value-last-time"] = range.second.get_timestamp()->c_str();
171 info.setMetaData(md);
172
173 auto image = o2::ccdb::CcdbApi::createObjectImage(&dpmap, &info);
174 LOG(info) << "Sending object " << info.getPath() << "/"
175 << info.getFileName() << " of size " << image->size()
176 << " bytes, valid for " << info.getStartValidityTimestamp()
177 << " : " << info.getEndValidityTimestamp()
178 << " | reason: " << reason;
179 output.snapshot(o2::framework::Output{Utils::gDataOriginCDBPayload, CCDBOBJ, 0}, *image.get());
180 output.snapshot(o2::framework::Output{Utils::gDataOriginCDBWrapper, CCDBOBJ, 0}, info);
181}
182
183/*
184 * Implementation of DPL end of stream callback.
185 *
186 * We send the remaining datapoints at the end of the processing.
187 */
188void endOfStream(o2::framework::EndOfStreamContext& eosc)
189{
190 LOG(debug) << "This is the end. Must write what we have left ?\n";
191 for (auto i = 0; i < NOBJECTS; i++) {
192 if (t0[i] == 0) {
193 continue;
194 }
195 sendOutput(dataPoints[i], eosc.outputs(), info[i], "end of stream", t0[i]);
196 }
197}
198
199/*
200 * Decides whether or not the dpmap should be sent to the output.
201 *
202 * @param maxSize if the dpmap size is above this size,
203 * then it should go to output
204 * @param maxDuration if the dpmap spans more than this duration,
205 * then it should go to output
206 *
207 * @returns a boolean stating if the dpmap should be output and a string
208 * describing why it should be output.
209 */
210std::tuple<bool, std::string> needOutput(const DPMAP& dpmap, int maxSize, int maxDuration, uint64_t currentDuration)
211{
212 std::string reason;
213
214 if (dpmap.empty()) {
215 return {false, reason};
216 }
217
218 bool bigEnough{false};
219 bool longEnough{false};
220 bool complete{true}; // FIXME: should check here that we indeed have all our dataPoints
221
222 if (maxSize && (computeSize(dpmap) > maxSize)) {
223 bigEnough = true;
224 reason += "[big enough]";
225 }
226
227 if (maxDuration) {
228 if (currentDuration > maxDuration) {
229 longEnough = true;
230 reason += fmt::format("[long enough ({} s)]", currentDuration);
231 }
232 }
233
234 return {complete && (bigEnough || longEnough), reason};
235}
236
237/*
238 * Process the datapoints received.
239 *
240 * The datapoints are accumulated into one (MID) or two (MCH) DPMAPs (map from
241 * alias names to vector of DataPointValue) : one for HV values (MID and MCH)
242 * and one for LV values (MCH only).
243 *
244 * If the DPMAPs satisfy certain conditions (@see needOutput) they are sent to
245 * the output.
246 *
247 * @param aliases an array of one or two vectors of aliases (one for HV values,
248 * one for LV values)
249 * @param maxSize an array of one or two values for the
250 * maxsizes of the HV and LV values respectively
251 * @param maxDuration an array of
252 * one or two values for the max durations of the HV and LV values respectively
253 */
254void processDataPoints(o2::framework::ProcessingContext& pc,
255 std::array<std::vector<std::string>, NOBJECTS> aliases,
256 std::array<int, NOBJECTS> maxSize,
257 std::array<int, NOBJECTS> maxDuration,
258 bool reportTiming)
259{
260 TStopwatch sw;
261 auto creationTime = o2::header::get<o2::framework::DataProcessingHeader*>(pc.inputs().get("input").header)->creation;
262 for (auto i = 0; i < NOBJECTS; i++) {
263 if (t0[i] == 0) {
264 t0[i] = creationTime;
265 }
266 }
267 auto dps = pc.inputs().get<gsl::span<o2::dcs::DataPointCompositeObject>>("input");
268 for (auto dp : dps) {
269 for (auto i = 0; i < NOBJECTS; i++) {
270 if (std::find(aliases[i].begin(), aliases[i].end(), dp.id.get_alias()) != aliases[i].end()) {
271 auto& v = dataPoints[i][dp.id];
272 bool shouldAdd{true};
273 if (v.size() > 0 && v.back() == dp.data) {
274 shouldAdd = false;
275 }
276 if (shouldAdd) {
277 v.emplace_back(dp.data);
278 }
279 }
280 }
281 }
282
283 for (auto i = 0; i < NOBJECTS; i++) {
284 auto duration = (creationTime - t0[i]) / 1000; // ms -> s
285 auto [shouldOutput, reason] = needOutput(dataPoints[i], maxSize[i], maxDuration[i], duration);
286 if (shouldOutput) {
287 sendOutput(dataPoints[i], pc.outputs(), info[i], reason, t0[i]);
288 t0[i] = 0;
289 dataPoints[i].clear();
290 }
291 }
292 sw.Stop();
293 if (reportTiming) {
294 LOGP(info, "Timing CPU:{:.3e} Real:{:.3e} at slice {}", sw.CpuTime(), sw.RealTime(), pc.services().get<o2::framework::TimingInfo>().timeslice);
295 }
296}
297
298/*
299 * Creates the main processing function.
300 *
301 * @param ic InitContext which is used to get the options and set the end of
302 * stream callback
303 */
305{
306 auto& callbacks = ic.services().get<o2::framework::CallbackService>();
308
309 // the aliases arrays contain all the names of the MCH or MID data points
310 // we are interested to transit to the CCDB
311#if defined(MUON_SUBSYSTEM_MCH)
312 std::array<std::vector<std::string>, NOBJECTS> aliases = {
318 std::array<int, 2> maxSize{
319 ic.options().get<int>("hv-max-size"),
320 ic.options().get<int>("lv-max-size")};
321
322 std::array<int, 2> maxDuration{
323 ic.options().get<int>("hv-max-duration"),
324 ic.options().get<int>("lv-max-duration")};
325#elif defined(MUON_SUBSYSTEM_MID)
326 std::array<std::vector<std::string>, NOBJECTS> aliases = {
329 std::array<int, NOBJECTS> maxSize{ic.options().get<int>("hv-max-size")};
330 std::array<int, NOBJECTS> maxDuration{ic.options().get<int>("hv-max-duration")};
331#endif
332 bool reportTiming = ic.options().get<bool>("report-timing");
333 for (auto i = 0; i < NOBJECTS; i++) {
334 dataPoints[i].clear();
335 }
336
337 return [aliases, maxSize, maxDuration, reportTiming](o2::framework::ProcessingContext& pc) {
338 processDataPoints(pc, aliases, maxSize, maxDuration, reportTiming);
339 };
340}
341
342/* Helper function to create a ConfigParamSpec option object.
343 *
344 * @param name is either 'size' or 'duration'
345 * @param value is the default value to be used (i.e. when the option is not
346 * specified on the command line)
347 * @param what is either 'hv' or 'lv'
348 * @param unit is the unit in which the values are given
349 */
350o2::framework::ConfigParamSpec whenToSendOption(const char* name, int value,
351 const char* what, const char* unit)
352{
353 std::string uname(name);
355
356 std::string description = fmt::format(R"(max {} calibration object {} (in {}).
357When that {} is reached the object is shipped. Use 0 to disable this check.)",
358 uname, what, unit, what);
359
360 return {fmt::format("{}-max-{}", name, what),
362 value,
363 {description}};
364}
365
366} // namespace
367
372
389{
390 DataProcessorSpec dcsProcessor;
391
392 AlgorithmSpec algo(createProcessFunction);
393
394 dcsProcessor.name = fmt::format("{}-dcs-processor", o2::muon::subsysname());
395 dcsProcessor.inputs = o2::framework::Inputs
396 {
397#if defined(MUON_SUBSYSTEM_MCH)
398 {
399 "input", "DCS", "MCHDATAPOINTS"
400 }
401 };
402#elif defined(MUON_SUBSYSTEM_MID)
403 {
404 "input", "DCS", "MIDDATAPOINTS"
405 }
406 };
407#endif
408 dcsProcessor.outputs.emplace_back(o2::framework::ConcreteDataTypeMatcher{Utils::gDataOriginCDBPayload, CCDBOBJ}, o2::framework::Lifetime::Sporadic);
409 dcsProcessor.outputs.emplace_back(o2::framework::ConcreteDataTypeMatcher{Utils::gDataOriginCDBWrapper, CCDBOBJ}, o2::framework::Lifetime::Sporadic);
410 dcsProcessor.algorithm = algo;
411 dcsProcessor.options = {
412 {"report-timing", o2::framework::VariantType::Bool, false, {"Report timing for every slice"}},
413#if defined(MUON_SUBSYSTEM_MCH)
414 whenToSendOption("lv", 128, "size", "KB"),
415 whenToSendOption("lv", 8 * 3600, "duration", "seconds"),
416#endif
417 whenToSendOption("hv", 128, "size", "KB"),
418 whenToSendOption("hv", 8 * 3600, "duration", "seconds")};
419
420 return {dcsProcessor};
421}
Utils and constants for calibration and related workflows.
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
static std::string generateFileName(const std::string &inp)
Definition CcdbApi.cxx:798
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
long getEndValidityTimestamp() const
void setStartValidityTimestamp(long start)
void setFileName(const std::string &nm)
void setPath(const std::string &path)
const std::string & getPath() const
static constexpr long DAY
void setEndValidityTimestamp(long end)
const std::map< std::string, std::string > & getMetaData() const
void setObjectType(const std::string &tp)
void setMetaData(const std::map< std::string, std::string > &md)
long getStartValidityTimestamp() const
const std::string & getFileName() const
static constexpr long INFINITE_TIMESTAMP
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
std::unordered_map< DPID, std::vector< DPVAL > > DPMAP
Definition dcs-ccdb.cxx:37
std::pair< DPVAL, DPVAL > computeTimeRange(const std::vector< DPVAL > &dps)
Definition dcs-ccdb.cxx:59
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLeglImageOES image
Definition glcorearb.h:4021
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLint * range
Definition glcorearb.h:1899
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat t0
Definition glcorearb.h:5034
void to_upper_case(std::string &str) noexcept
Definition StringUtils.h:80
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< std::string > aliases(std::vector< MeasurementType > types={ MeasurementType::HV_V, MeasurementType::HV_I, MeasurementType::LV_V_FEE_ANALOG, MeasurementType::LV_V_FEE_DIGITAL, MeasurementType::LV_V_SOLAR})
std::vector< std::string > aliases(std::vector< MeasurementType > types={ MeasurementType::HV_V, MeasurementType::HV_I})
Definition DCSNamer.cxx:127
const char * subsysname()
Definition subsysname.h:17
Enum< T >::Iterator begin(Enum< T >)
Definition Defs.h:173
o2::framework::WorkflowSpec WorkflowSpec
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
uint64_t get_epoch_time() const noexcept
std::function< void(ProcessingContext &)> ProcessCallback
static std::string getClassName(const T &obj)
get the class name of the object
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
TStopwatch sw