43 std::string mInputFileName{};
49 int deltaTimeSendData = -1;
50 uint32_t startTime = -1;
52 std::vector<std::vector<int>> dataIndicesPerTF;
54 std::vector<o2::dcs::test::HintType> mDataPointHints;
58DCSDataReplayer::DCSDataReplayer(std::vector<o2::dcs::test::HintType> hints,
60 mDataDescription(description) {}
64 mMaxTF = ic.
options().
get<int64_t>(
"max-timeframes");
65 mInputFileName = ic.
options().
get<std::string>(
"input-file");
66 deltaTimeSendData = ic.
options().
get<
int>(
"delta-time-send-data");
67 mInputData.ReadFile(mInputFileName.data(),
"time/D:alias/C:value/D",
';');
68 mInputData.SetBranchAddress(
"time", &mTime);
69 mInputData.SetBranchAddress(
"value", &mValue);
70 mInputData.SetBranchAddress(
"alias", mAlias);
76 uint64_t tfid = o2::header::get<o2::framework::DataProcessingHeader*>((*input).header)->startTime;
78 LOG(info) <<
"Data generator reached TF " << tfid <<
", stopping";
84 std::vector<o2::dcs::DataPointCompositeObject> dpcoms;
86 for (Long64_t iEntry = 0; iEntry < mInputData.GetEntries(); ++iEntry) {
87 int entryTree = iEntry;
90 if (deltaTimeSendData > 0 && tfid > 2) {
92 if (tfid - 1 >= dataIndicesPerTF.size()) {
93 LOGP(warning,
"TF ID {} is larger than the number of TFs in dataIndicesPerTF: {}", tfid, dataIndicesPerTF.size());
97 if (iEntry >= dataIndicesPerTF[tfid - 1].
size()) {
100 entryTree = dataIndicesPerTF[tfid - 1][iEntry];
104 mInputData.GetEntry(entryTree);
105 const auto ultime = uint64_t(std::round(mTime * 1000));
106 const auto seconds = uint32_t(ultime / 1000);
107 const auto msec = uint16_t(ultime % 1000);
108 if (deltaTimeSendData > 0) {
111 startTime = std::min(startTime, seconds);
112 endTime = std::max(endTime, seconds);
113 if (iEntry == mInputData.GetEntries() - 1) {
114 const int totalTFs = (endTime - startTime) / deltaTimeSendData + 1;
115 dataIndicesPerTF.resize(totalTFs);
116 LOGP(info,
"Sending data from {} to {} with {} TFs", startTime, endTime, totalTFs);
120 const int index = (seconds - startTime) / deltaTimeSendData;
121 dataIndicesPerTF[
index].emplace_back(iEntry);
123 const uint64_t startTimeTF = startTime + (tfid - 1) * deltaTimeSendData;
124 const uint64_t endTimeTF = startTimeTF + deltaTimeSendData;
125 if (seconds >= startTimeTF && seconds < endTimeTF) {
128 if (seconds == endTime) {
140 <<
"***************** TF " << tfid <<
" has generated " << dpcoms.size() <<
" DPs";
151 std::string desc{detName};
152 desc +=
"DATAPOINTS";
159 "dcs-random-data-generator",
161 Outputs{{{
"outputDCS"},
"DCS", dd}},
164 {
"max-timeframes", VariantType::Int64, 99999999999ll, {
"max TimeFrames to generate"}},
165 {
"delta-fraction", VariantType::Float, 0.05f, {
"fraction of data points to put in the delta"}},
166 {
"delta-time-send-data", VariantType::Int, -1, {
"if larger than zero the data will be send in time intervals of this size"}},
167 {
"input-file", VariantType::String,
"", {
"Input file with data to play back"}}}};
T get(const char *key) const
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void init(InitContext &context)
virtual void run(ProcessingContext &context)=0
o2::framework::DataProcessorSpec getDCSDataReplaySpec(std::vector< HintType > hints={}, const char *detName="TPC")
o2::dcs::DataPointCompositeObject createDataPointCompositeObject(const std::string &alias, T val, uint32_t seconds, uint16_t msec, uint16_t flags=0)
Defining PrimaryVertex explicitly as messageable.
@ Me
Only quit this data processor.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"