Project
Loading...
Searching...
No Matches
DCSDataReplaySpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
19#include "Framework/Logger.h"
20#include "Framework/Task.h"
21#include "TTree.h"
22#include <cmath>
23#include <variant>
24#include <string>
25#include <algorithm>
26#include <vector>
27
28using namespace o2::framework;
29
30namespace
31{
32
33class DCSDataReplayer : public o2::framework::Task
34{
35 public:
36 DCSDataReplayer(std::vector<o2::dcs::test::HintType> hints, o2::header::DataDescription description);
37
38 void init(o2::framework::InitContext& ic) final;
39
41
42 private:
43 std::string mInputFileName{};
44 double mTime{};
45 double mValue{};
46 char mAlias[50];
47 uint64_t mMaxTF;
48 uint64_t mTFs = 0;
49 TTree mInputData;
50 std::vector<o2::dcs::test::HintType> mDataPointHints;
51 o2::header::DataDescription mDataDescription;
52};
53
54DCSDataReplayer::DCSDataReplayer(std::vector<o2::dcs::test::HintType> hints,
55 o2::header::DataDescription description) : mDataPointHints(hints),
56 mDataDescription(description) {}
57
58void DCSDataReplayer::init(o2::framework::InitContext& ic)
59{
60 mMaxTF = ic.options().get<int64_t>("max-timeframes");
61 mInputFileName = ic.options().get<std::string>("input-file");
62 mInputData.ReadFile(mInputFileName.data(), "time/D:alias/C:value/D", ';');
63 mInputData.SetBranchAddress("time", &mTime);
64 mInputData.SetBranchAddress("value", &mValue);
65 mInputData.SetBranchAddress("alias", mAlias);
66}
67
68void DCSDataReplayer::run(o2::framework::ProcessingContext& pc)
69{
70 auto input = pc.inputs().begin();
71 uint64_t tfid = o2::header::get<o2::framework::DataProcessingHeader*>((*input).header)->startTime;
72 if (tfid >= mMaxTF) {
73 LOG(info) << "Data generator reached TF " << tfid << ", stopping";
74 pc.services().get<o2::framework::ControlService>().endOfStream();
76 }
77
78 std::vector<o2::dcs::DataPointCompositeObject> dpcoms;
79 for (Long64_t iEntry = 0; iEntry < mInputData.GetEntries(); ++iEntry) {
80 mInputData.GetEntry(iEntry);
81 const auto ultime = uint64_t(std::round(mTime * 1000));
82 const auto seconds = uint32_t(ultime / 1000);
83 const auto msec = uint16_t(ultime % 1000);
84
85 dpcoms.emplace_back(o2::dcs::createDataPointCompositeObject(mAlias, float(mValue), seconds, msec));
86 }
87 // auto dpcoms = generate(mDataPointHints, fraction, tfid);
88
89 LOG(info)
90 << "***************** TF " << tfid << " has generated " << dpcoms.size() << " DPs";
91 pc.outputs().snapshot(Output{"DCS", mDataDescription, 0}, dpcoms);
92 mTFs++;
93}
94} // namespace
95
96namespace o2::dcs::test
97{
98o2::framework::DataProcessorSpec getDCSDataReplaySpec(std::vector<o2::dcs::test::HintType> hints,
99 const char* detName)
100{
101 std::string desc{detName};
102 desc += "DATAPOINTS";
103
105
106 dd.runtimeInit(desc.c_str(), desc.size());
107
108 return DataProcessorSpec{
109 "dcs-random-data-generator",
110 Inputs{},
111 Outputs{{{"outputDCS"}, "DCS", dd}},
112 AlgorithmSpec{adaptFromTask<DCSDataReplayer>(hints, dd)},
113 Options{
114 {"max-timeframes", VariantType::Int64, 99999999999ll, {"max TimeFrames to generate"}},
115 {"delta-fraction", VariantType::Float, 0.05f, {"fraction of data points to put in the delta"}},
116 {"input-file", VariantType::String, "", {"Input file with data to play back"}}}};
117}
118} // namespace o2::dcs::test
void snapshot(const Output &spec, T const &object)
ConfigParamRegistry const & options()
Definition InitContext.h:33
const_iterator begin() const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
virtual void init(InitContext &context)
Definition Task.h:37
virtual void run(ProcessingContext &context)=0
o2::framework::DataProcessorSpec getDCSDataReplaySpec(std::vector< HintType > hints={}, const char *detName="TPC")
o2::dcs::DataPointCompositeObject createDataPointCompositeObject(const std::string &alias, T val, uint32_t seconds, uint16_t msec, uint16_t flags=0)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ Me
Only quit this data processor.
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"