Project
Loading...
Searching...
No Matches
DCSDataReplaySpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
19#include "Framework/Logger.h"
20#include "Framework/Task.h"
21#include "TTree.h"
22#include <cmath>
23#include <variant>
24#include <string>
25#include <algorithm>
26#include <vector>
27
28using namespace o2::framework;
29
30namespace
31{
32
33class DCSDataReplayer : public o2::framework::Task
34{
35 public:
36 DCSDataReplayer(std::vector<o2::dcs::test::HintType> hints, o2::header::DataDescription description);
37
38 void init(o2::framework::InitContext& ic) final;
39
41
42 private:
43 std::string mInputFileName{};
44 double mTime{};
45 double mValue{};
46 char mAlias[50];
47 uint64_t mMaxTF;
48 uint64_t mTFs = 0;
49 int deltaTimeSendData = -1;
50 uint32_t startTime = -1;
51 uint32_t endTime = 0;
52 std::vector<std::vector<int>> dataIndicesPerTF;
53 TTree mInputData;
54 std::vector<o2::dcs::test::HintType> mDataPointHints;
55 o2::header::DataDescription mDataDescription;
56};
57
58DCSDataReplayer::DCSDataReplayer(std::vector<o2::dcs::test::HintType> hints,
59 o2::header::DataDescription description) : mDataPointHints(hints),
60 mDataDescription(description) {}
61
62void DCSDataReplayer::init(o2::framework::InitContext& ic)
63{
64 mMaxTF = ic.options().get<int64_t>("max-timeframes");
65 mInputFileName = ic.options().get<std::string>("input-file");
66 deltaTimeSendData = ic.options().get<int>("delta-time-send-data");
67 mInputData.ReadFile(mInputFileName.data(), "time/D:alias/C:value/D", ';');
68 mInputData.SetBranchAddress("time", &mTime);
69 mInputData.SetBranchAddress("value", &mValue);
70 mInputData.SetBranchAddress("alias", mAlias);
71}
72
73void DCSDataReplayer::run(o2::framework::ProcessingContext& pc)
74{
75 auto input = pc.inputs().begin();
76 uint64_t tfid = o2::header::get<o2::framework::DataProcessingHeader*>((*input).header)->startTime;
77 if (tfid >= mMaxTF) {
78 LOG(info) << "Data generator reached TF " << tfid << ", stopping";
79 pc.services().get<o2::framework::ControlService>().endOfStream();
81 return;
82 }
83
84 std::vector<o2::dcs::DataPointCompositeObject> dpcoms;
85
86 for (Long64_t iEntry = 0; iEntry < mInputData.GetEntries(); ++iEntry) {
87 int entryTree = iEntry;
88
89 // load only releavant entries if requested
90 if (deltaTimeSendData > 0 && tfid > 2) {
91
92 if (tfid - 1 >= dataIndicesPerTF.size()) {
93 LOGP(warning, "TF ID {} is larger than the number of TFs in dataIndicesPerTF: {}", tfid, dataIndicesPerTF.size());
94 break;
95 }
96
97 if (iEntry >= dataIndicesPerTF[tfid - 1].size()) {
98 break;
99 } else {
100 entryTree = dataIndicesPerTF[tfid - 1][iEntry];
101 }
102 }
103
104 mInputData.GetEntry(entryTree);
105 const auto ultime = uint64_t(std::round(mTime * 1000));
106 const auto seconds = uint32_t(ultime / 1000);
107 const auto msec = uint16_t(ultime % 1000);
108 if (deltaTimeSendData > 0) {
109 // send data in packages
110 if (tfid == 0) {
111 startTime = std::min(startTime, seconds);
112 endTime = std::max(endTime, seconds);
113 if (iEntry == mInputData.GetEntries() - 1) {
114 const int totalTFs = (endTime - startTime) / deltaTimeSendData + 1;
115 dataIndicesPerTF.resize(totalTFs);
116 LOGP(info, "Sending data from {} to {} with {} TFs", startTime, endTime, totalTFs);
117 }
118 } else {
119 if (tfid == 1) {
120 const int index = (seconds - startTime) / deltaTimeSendData;
121 dataIndicesPerTF[index].emplace_back(iEntry);
122 }
123 const uint64_t startTimeTF = startTime + (tfid - 1) * deltaTimeSendData;
124 const uint64_t endTimeTF = startTimeTF + deltaTimeSendData;
125 if (seconds >= startTimeTF && seconds < endTimeTF) {
126 dpcoms.emplace_back(o2::dcs::createDataPointCompositeObject(mAlias, float(mValue), seconds, msec));
127 // check if all data has been processed
128 if (seconds == endTime) {
129 mMaxTF = tfid;
130 }
131 }
132 }
133 } else {
134 dpcoms.emplace_back(o2::dcs::createDataPointCompositeObject(mAlias, float(mValue), seconds, msec));
135 }
136 }
137 // auto dpcoms = generate(mDataPointHints, fraction, tfid);
138
139 LOG(info)
140 << "***************** TF " << tfid << " has generated " << dpcoms.size() << " DPs";
141 pc.outputs().snapshot(Output{"DCS", mDataDescription, 0}, dpcoms);
142 mTFs++;
143}
144} // namespace
145
146namespace o2::dcs::test
147{
148o2::framework::DataProcessorSpec getDCSDataReplaySpec(std::vector<o2::dcs::test::HintType> hints,
149 const char* detName)
150{
151 std::string desc{detName};
152 desc += "DATAPOINTS";
153
155
156 dd.runtimeInit(desc.c_str(), desc.size());
157
158 return DataProcessorSpec{
159 "dcs-random-data-generator",
160 Inputs{},
161 Outputs{{{"outputDCS"}, "DCS", dd}},
162 AlgorithmSpec{adaptFromTask<DCSDataReplayer>(hints, dd)},
163 Options{
164 {"max-timeframes", VariantType::Int64, 99999999999ll, {"max TimeFrames to generate"}},
165 {"delta-fraction", VariantType::Float, 0.05f, {"fraction of data points to put in the delta"}},
166 {"delta-time-send-data", VariantType::Int, -1, {"if larger than zero the data will be send in time intervals of this size"}},
167 {"input-file", VariantType::String, "", {"Input file with data to play back"}}}};
168}
169} // namespace o2::dcs::test
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
const_iterator begin() const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void init(InitContext &context)
Definition Task.h:37
virtual void run(ProcessingContext &context)=0
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
o2::framework::DataProcessorSpec getDCSDataReplaySpec(std::vector< HintType > hints={}, const char *detName="TPC")
o2::dcs::DataPointCompositeObject createDataPointCompositeObject(const std::string &alias, T val, uint32_t seconds, uint16_t msec, uint16_t flags=0)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ Me
Only quit this data processor.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"