Project
Loading...
Searching...
No Matches
MonitorWorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <vector>
13#include <string>
14#include <thread>
15#include <chrono>
16#include "fmt/format.h"
17
18#include "Framework/Task.h"
21#include "Framework/Logger.h"
25
27#include "Headers/DataHeader.h"
28#include "CCDB/CcdbApi.h"
30
31#include "TPCQC/Clusters.h"
32#include "TPCBase/Mapper.h"
37using namespace o2::framework;
39
40namespace o2::tpc
41{
42
44{
45 public:
46 TPCMonitorDevice(bool useDigits) : mUseDigits(useDigits) {}
47
49 {
50 mBlocking = ic.options().get<bool>("blocking");
51 const int maxTimeBins = ic.options().get<int>("max-time-bins");
52
53 // set up ADC value filling
54 mRawReader.createReader("");
55 mDigitDump.init();
56 mDigitDump.setInMemoryOnly();
57 mDigitDump.setTimeBinRange(0, maxTimeBins);
58 const auto pedestalFile = ic.options().get<std::string>("pedestal-file");
59 if (pedestalFile.length()) {
60 LOGP(info, "Setting pedestal file: {}", pedestalFile);
61 mDigitDump.setPedestalAndNoiseFile(pedestalFile);
62 }
63
64 // TODO: Get rid of digit dump and use mDigits?
65 mRawReader.setADCDataCallback([this](const PadROCPos& padROCPos, const CRU& cru, const gsl::span<const uint32_t> data) -> int {
66 const int timeBins = mDigitDump.update(padROCPos, cru, data);
67 mDigitDump.setNumberOfProcessedTimeBins(std::max(mDigitDump.getNumberOfProcessedTimeBins(), size_t(timeBins)));
68 return timeBins;
69 });
70
71 mRawReader.setLinkZSCallback([this](int cru, int rowInSector, int padInRow, int timeBin, float adcValue) -> bool {
72 CRU cruID(cru);
73 const PadRegionInfo& regionInfo = Mapper::instance().getPadRegionInfo(cruID.region());
74 mDigitDump.updateCRU(cruID, rowInSector - regionInfo.getGlobalRowOffset(), padInRow, timeBin, adcValue);
75 return true;
76 });
77
78 if (mUseDigits) {
79 mEventDisplayGUI.getEventDisplay().setDigits(&mDigits);
80 } else {
81 mEventDisplayGUI.getEventDisplay().setDigits(&mDigitDump.getDigits());
82 }
83
84 mGUIThread = std::make_unique<std::thread>(&SimpleEventDisplayGUI::startGUI, &mEventDisplayGUI, maxTimeBins);
85
86 auto finishFunction = [this]() {
87 if (mGUIThread) {
88 mGUIThread->join();
89 }
90 };
91 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(finishFunction);
92 }
93
95 {
96 const auto validInputs = pc.inputs().countValidInputs();
97 mEventDisplayGUI.setDataAvailable(validInputs);
98
99 while (mBlocking && validInputs && !mEventDisplayGUI.isNextEventRequested() && !mEventDisplayGUI.isStopRequested()) {
100 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
101 LOGP(info, "wait for next event stop {}", mEventDisplayGUI.isStopRequested());
102 }
103
104 if (mEventDisplayGUI.isStopRequested()) {
105 LOGP(info, "call end processing");
106 pc.services().get<ControlService>().readyToQuit(QuitRequest::All);
107 return;
108 }
109
110 LOGP(info, "next event requested next {}, processing {}, updating {}", mEventDisplayGUI.isNextEventRequested(), mEventDisplayGUI.isProcessingEvent(), mEventDisplayGUI.isWaitingForDigitUpdate());
111 if (!mEventDisplayGUI.isNextEventRequested() && !mEventDisplayGUI.isProcessingEvent()) {
112 return;
113 }
114 mEventDisplayGUI.resetNextEventReqested();
115
116 const auto tf = pc.services().get<o2::framework::TimingInfo>().tfCounter;
117 mEventDisplayGUI.getEventDisplay().setPresentEventNumber(size_t(tf));
118 LOGP(info, "processing tF {}", tf);
119
120 if (!mUseDigits) {
121 mDigitDump.clearDigits();
122 auto& reader = mRawReader.getReaders()[0];
123 calib_processing_helper::processRawData(pc.inputs(), reader);
124
125 mDigitDump.incrementNEvents();
126 } else {
127 // clear digits
128 std::for_each(mDigits.begin(), mDigits.end(), [](auto& vec) { vec.clear(); });
129
130 copyDigits(pc.inputs());
131 }
132
133 mEventDisplayGUI.resetUpdatingDigits();
134 mEventDisplayGUI.setDataAvailable(false);
135 }
136
137 private:
138 SimpleEventDisplayGUI mEventDisplayGUI;
139 DigitDump mDigitDump;
140 std::array<std::vector<Digit>, Sector::MAXSECTOR> mDigits;
141 std::unique_ptr<std::thread> mGUIThread;
142
144 bool mUseDigits{false};
145 bool mBlocking{false};
146
147 void copyDigits(InputRecord& inputs)
148 {
149 std::vector<InputSpec> filter = {
150 {"check", ConcreteDataTypeMatcher{"TPC", "DIGITS"}, Lifetime::Timeframe},
151 };
152 for (auto const& inputRef : InputRecordWalker(inputs)) {
153 auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(inputRef);
154 if (sectorHeader == nullptr) {
155 LOG(error) << "sector header missing on header stack for input on " << inputRef.spec->binding;
156 continue;
157 }
158 const int sector = sectorHeader->sector();
159 mDigits[sector] = inputs.get<std::vector<o2::tpc::Digit>>(inputRef);
160 }
161 }
162};
163
165{
166 const bool useDigitsAsInput = inputSpec.find("DIGITS") != std::string::npos;
167 std::vector<OutputSpec> outputs;
168
169 return DataProcessorSpec{
170 "tpc-monitor-workflow",
171 select(inputSpec.data()),
172 outputs,
173 AlgorithmSpec{adaptFromTask<TPCMonitorDevice>(useDigitsAsInput)},
174 Options{
175 {"pedestal-file", VariantType::String, "", {"file with pedestals and noise for zero suppression"}},
176 {"max-time-bins", VariantType::Int, 114048, {"maximum number of time bins to show"}},
177 {"blocking", VariantType::Bool, false, {"block processing until next event is received"}},
178 } // end Options
179 }; // end DataProcessorSpec
180}
181} // namespace o2::tpc
Utils and constants for calibration and related workflows.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
A helper class to iteratate over all parts of all input routes.
GUI for raw data event display.
o2::header::DataHeader::SubSpecificationType SubSpecificationType
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
unsigned char region() const
Definition CRU.h:64
void setNumberOfProcessedTimeBins(size_t timeBins)
number of processed time bins in last event
Int_t update(const PadROCPos &padROCPos, const CRU &cru, const gsl::span< const uint32_t > data)
void incrementNEvents()
increment number of events
void setDigits(std::array< std::vector< Digit >, Sector::MAXSECTOR > *digits)
set external digits
void setPresentEventNumber(size_t eventNr)
set present event number
size_t getNumberOfProcessedTimeBins() const
number of processed time bins in last event
Pedestal calibration class.
Definition DigitDump.h:49
void clearDigits()
clear the digits
Definition DigitDump.h:120
void setTimeBinRange(int first, int last)
set the timeBin range
Definition DigitDump.h:110
void init()
initialize DigitDump from DigitDumpParam
Definition DigitDump.cxx:37
std::vector< Digit > & getDigits(int sector)
return digits for specific sector
Definition DigitDump.h:136
void setPedestalAndNoiseFile(std::string_view fileName)
pedestal file name
Definition DigitDump.h:58
void setInMemoryOnly(bool mode=true)
set in memory only mode
Definition DigitDump.h:130
Int_t updateCRU(const CRU &cru, const Int_t row, const Int_t pad, const Int_t timeBin, const Float_t signal) final
not used
Definition DigitDump.cxx:56
static Mapper & instance(const std::string mappingDir="")
Definition Mapper.h:44
const PadRegionInfo & getPadRegionInfo(const unsigned char region) const
Definition Mapper.h:385
Pad and row inside a ROC.
Definition PadROCPos.h:37
unsigned char getGlobalRowOffset() const
static constexpr int MAXSECTOR
Definition Sector.h:44
void startGUI(int maxTimeBins=114048)
SimpleEventDisplay & getEventDisplay()
void init(o2::framework::InitContext &ic) final
void run(o2::framework::ProcessingContext &pc) final
RawReaderCRU & createReader(const std::string_view inputFileName, uint32_t numTimeBins=0, uint32_t link=0, uint32_t stream=0, uint32_t debugLevel=0, uint32_t verbosity=0, const std::string_view outputFilePrefix="")
create a new raw reader
auto & getReaders()
return vector of readers
void setLinkZSCallback(LinkZSCallback function)
set a callback function for decoded LinkZS data
void setADCDataCallback(ADCDataCallback function)
set a callback function
GLboolean * data
Definition glcorearb.h:298
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
uint64_t processRawData(o2::framework::InputRecord &inputs, std::unique_ptr< o2::tpc::rawreader::RawReaderCRU > &reader, bool useOldSubspec=false, const std::vector< int > &sectors={}, size_t *nerrors=nullptr, uint32_t syncOffsetReference=144, uint32_t decoderType=1, bool useTrigger=true, bool returnOnNoTrigger=false)
Global TPC definitions and constants.
Definition SimTraits.h:167
o2::framework::DataProcessorSpec getMonitorWorkflowSpec(std::string inputSpec="tpcraw:TPC/RAWDATA")
std::unique_ptr< GPUReconstructionTimeframe > tf
std::vector< o2::ctf::BufferType > vec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"