27#include <unordered_map>
29#include <TStopwatch.h>
31#if defined(MUON_SUBSYSTEM_MCH)
33#define CCDBOBJ "MCH_DPS"
35#if defined(MUON_SUBSYSTEM_MID)
37#define CCDBOBJ "MID_DPS"
45using DPMAP = std::unordered_map<DPID, std::vector<DPVAL>>;
69 std::map<std::string, std::string> md;
74#if defined(MUON_SUBSYSTEM_MCH)
76std::array<o2::ccdb::CcdbObjectInfo, NOBJECTS> info{createDefaultInfo(
"MCH/Calib/HV"), createDefaultInfo(
"MCH/Calib/LV")};
77std::array<uint64_t, NOBJECTS>
t0 = {0, 0};
78#elif defined(MUON_SUBSYSTEM_MID)
80std::array<o2::ccdb::CcdbObjectInfo, NOBJECTS> info{createDefaultInfo(
"MID/Calib/HV")};
81std::array<uint64_t, NOBJECTS>
t0 = {0};
84std::array<DPMAP, NOBJECTS> dataPoints;
92 uint64_t minTime{std::numeric_limits<uint64_t>::max()};
95 for (
auto did : dpmap) {
96 for (
auto d : did.second) {
108 return std::make_pair(dmin, dmax);
114size_t computeSize(
const DPMAP& dpmap)
116 constexpr int itemSize = 64;
117 constexpr float byte2KB = 1.0 / 1024;
120 for (
auto did : dpmap) {
122 nofItems += did.second.size();
124 return static_cast<size_t>(std::floor(nofItems * itemSize * byte2KB));
130int computeDuration(
const DPMAP& dpmap)
133 return static_cast<int>((
range.second.get_epoch_time() -
range.first.get_epoch_time()) / 1000);
145void sendOutput(
const DPMAP& dpmap,
148 const std::string& reason,
149 uint64_t startOfValidity)
160 md[
"upload-reason"] = reason;
161 md[
"nof-datapoints"] = fmt::format(
"{}", dpmap.size());
162 size_t nofValues = 0;
163 for (
auto did : dpmap) {
164 nofValues += did.second.size();
166 md[
"nof-datapoint-values"] = fmt::format(
"{}", nofValues);
169 md[
"datapoint-value-first-time"] =
range.first.get_timestamp()->c_str();
170 md[
"datapoint-value-last-time"] =
range.second.get_timestamp()->c_str();
174 LOG(info) <<
"Sending object " << info.
getPath() <<
"/"
178 <<
" | reason: " << reason;
190 LOG(
debug) <<
"This is the end. Must write what we have left ?\n";
191 for (
auto i = 0;
i < NOBJECTS;
i++) {
195 sendOutput(dataPoints[
i], eosc.
outputs(), info[
i],
"end of stream",
t0[
i]);
210std::tuple<bool, std::string> needOutput(
const DPMAP& dpmap,
int maxSize,
int maxDuration, uint64_t currentDuration)
215 return {
false, reason};
218 bool bigEnough{
false};
219 bool longEnough{
false};
222 if (maxSize && (computeSize(dpmap) > maxSize)) {
224 reason +=
"[big enough]";
228 if (currentDuration > maxDuration) {
230 reason += fmt::format(
"[long enough ({} s)]", currentDuration);
234 return {complete && (bigEnough || longEnough), reason};
255 std::array<std::vector<std::string>, NOBJECTS> aliases,
256 std::array<int, NOBJECTS> maxSize,
257 std::array<int, NOBJECTS> maxDuration,
261 auto creationTime = o2::header::get<o2::framework::DataProcessingHeader*>(pc.
inputs().
get(
"input").header)->creation;
262 for (
auto i = 0;
i < NOBJECTS;
i++) {
264 t0[
i] = creationTime;
267 auto dps = pc.
inputs().
get<gsl::span<o2::dcs::DataPointCompositeObject>>(
"input");
268 for (
auto dp : dps) {
269 for (
auto i = 0;
i < NOBJECTS;
i++) {
270 if (std::find(aliases[
i].
begin(), aliases[
i].
end(), dp.id.get_alias()) != aliases[
i].end()) {
271 auto&
v = dataPoints[
i][dp.id];
272 bool shouldAdd{
true};
273 if (
v.size() > 0 &&
v.back() == dp.data) {
277 v.emplace_back(dp.data);
283 for (
auto i = 0;
i < NOBJECTS;
i++) {
284 auto duration = (creationTime -
t0[
i]) / 1000;
285 auto [shouldOutput, reason] = needOutput(dataPoints[
i], maxSize[
i], maxDuration[
i], duration);
287 sendOutput(dataPoints[
i], pc.
outputs(), info[
i], reason,
t0[
i]);
289 dataPoints[
i].clear();
311#if defined(MUON_SUBSYSTEM_MCH)
312 std::array<std::vector<std::string>, NOBJECTS>
aliases = {
318 std::array<int, 2> maxSize{
322 std::array<int, 2> maxDuration{
325#elif defined(MUON_SUBSYSTEM_MID)
326 std::array<std::vector<std::string>, NOBJECTS>
aliases = {
329 std::array<int, NOBJECTS> maxSize{ic.
options().
get<
int>(
"hv-max-size")};
330 std::array<int, NOBJECTS> maxDuration{ic.
options().
get<
int>(
"hv-max-duration")};
332 bool reportTiming = ic.
options().
get<
bool>(
"report-timing");
333 for (
auto i = 0;
i < NOBJECTS;
i++) {
334 dataPoints[
i].clear();
338 processDataPoints(pc, aliases, maxSize, maxDuration, reportTiming);
351 const char* what,
const char* unit)
353 std::string uname(
name);
356 std::string description = fmt::format(R
"(max {} calibration object {} (in {}).
357When that {} is reached the object is shipped. Use 0 to disable this check.)",
358 uname, what, unit, what);
360 return {fmt::format(
"{}-max-{}",
name, what),
397#if defined(MUON_SUBSYSTEM_MCH)
399 "input",
"DCS",
"MCHDATAPOINTS"
402#elif defined(MUON_SUBSYSTEM_MID)
404 "input",
"DCS",
"MIDDATAPOINTS"
410 dcsProcessor.algorithm = algo;
411 dcsProcessor.options = {
413#if defined(MUON_SUBSYSTEM_MCH)
414 whenToSendOption(
"lv", 128,
"size",
"KB"),
415 whenToSendOption(
"lv", 8 * 3600,
"duration",
"seconds"),
417 whenToSendOption(
"hv", 128,
"size",
"KB"),
418 whenToSendOption(
"hv", 8 * 3600,
"duration",
"seconds")};
420 return {dcsProcessor};
Utils and constants for calibration and related workflows.
static std::string generateFileName(const std::string &inp)
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
long getEndValidityTimestamp() const
void setStartValidityTimestamp(long start)
void setFileName(const std::string &nm)
void setPath(const std::string &path)
const std::string & getPath() const
static constexpr long DAY
void setEndValidityTimestamp(long end)
const std::map< std::string, std::string > & getMetaData() const
void setObjectType(const std::string &tp)
void setMetaData(const std::map< std::string, std::string > &md)
long getStartValidityTimestamp() const
const std::string & getFileName() const
static constexpr long INFINITE_TIMESTAMP
T get(const char *key) const
DataAllocator & outputs()
ServiceRegistryRef services()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
std::unordered_map< DPID, std::vector< DPVAL > > DPMAP
std::pair< DPVAL, DPVAL > computeTimeRange(const std::vector< DPVAL > &dps)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLuint const GLchar * name
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat t0
void to_upper_case(std::string &str) noexcept
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< std::string > aliases(std::vector< MeasurementType > types={ MeasurementType::HV_V, MeasurementType::HV_I, MeasurementType::LV_V_FEE_ANALOG, MeasurementType::LV_V_FEE_DIGITAL, MeasurementType::LV_V_SOLAR})
std::vector< std::string > aliases(std::vector< MeasurementType > types={ MeasurementType::HV_V, MeasurementType::HV_I})
const char * subsysname()
Enum< T >::Iterator begin(Enum< T >)
o2::framework::WorkflowSpec WorkflowSpec
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
uint64_t get_epoch_time() const noexcept
std::function< void(ProcessingContext &)> ProcessCallback
static std::string getClassName(const T &obj)
get the class name of the object
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"