Project
Loading...
Searching...
No Matches
CalDetMergerPublisherSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
17#include <bitset>
18#include <unordered_map>
19#include <vector>
20#include <string>
21#include <algorithm>
22#include <sstream>
23
24#include <fmt/format.h>
25
26#include "TMemFile.h"
27#include "TFile.h"
28
29#include "Framework/Task.h"
31#include "Framework/Logger.h"
34
35#include "Headers/DataHeader.h"
37#include "CCDB/CcdbApi.h"
38#include "CCDB/CcdbObjectInfo.h"
40#include "TPCBase/CalDet.h"
46
47using namespace o2::framework;
48using namespace o2::tpc;
51
53{
55
56 public:
57 CalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool sendToDCS, bool dumpAfterComplete = false) : mLanesToExpect(lanes), mCalibInfos(lanes), mSkipCCDB(skipCCDB), mSendToDCS(sendToDCS), mPublishAfterComplete(dumpAfterComplete) {}
58
60 {
61 mForceQuit = ic.options().get<bool>("force-quit");
62 mDirectFileDump = ic.options().get<bool>("direct-file-dump");
63 mCheckCalibInfos = ic.options().get<bool>("check-calib-infos");
64 }
65
67 {
68 int nSlots = pc.inputs().getNofParts(0);
69 assert(pc.inputs().getNofParts(1) == nSlots);
70
71 mRunNumber = processing_helpers::getRunNumber(pc);
72
73 for (int isl = 0; isl < nSlots; isl++) {
74 const auto calibInfo = pc.inputs().get<CalibRawPartInfo>("clbInfo", isl);
75 const auto type = calibInfo.calibType;
76 const auto pld = pc.inputs().get<gsl::span<char>>("clbPayload", isl); // this is actually an image of TMemFile
77 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().get("clbInfo", isl));
78 const auto subSpec = dh->subSpecification;
79 const int lane = subSpec >> 4;
80 const int calibType = subSpec & 0xf;
81 mCalibInfos[lane] = calibInfo;
82
83 // const auto& path = wrp->getPath();
84 TMemFile f("file", (char*)&pld[0], pld.size(), "READ");
85 if (!f.IsZombie()) {
86 auto calDetMap = f.Get<CalDetMap>("data");
87 if (calDetMap) {
88 if (mMergedCalDetsMap.size() == 0) {
89 mCalDetMapType = CDBType(type);
90 for (auto& [key, obj] : *calDetMap) {
91 mMergedCalDetsMap[key] = obj;
92 }
93 } else {
94 if (int(mCalDetMapType) != type) {
95 LOGP(fatal, "received CalDetMap of different type for merging, previous: {}, present{}", CDBTypeMap.at(mCalDetMapType), CDBTypeMap.at(CDBType(type)));
96 }
97 for (auto& [key, obj] : *calDetMap) {
98 mMergedCalDetsMap[key] += obj;
99 }
100 }
101 }
102
103 auto calDet = f.Get<o2::tpc::CalDet<float>>("data");
104 if (calDet) {
105 if (mMergedCalDets.find(type) == mMergedCalDets.end()) {
106 mMergedCalDets[type] = *calDet;
107 } else {
108 mMergedCalDets[type] += *calDet;
109 }
110 }
111 }
112 f.Close();
113
114 LOGP(info, "getting slot {}, subspec {:#8x}, lane {}, type {} ({}), firstTF {}, cycle {}", isl, subSpec, lane, calibType, type, calibInfo.tfIDInfo.tfCounter, calibInfo.publishCycle);
115 // if (mReceivedLanes.test(lane)) {
116 // LOGP(warning, "lane {} received multiple times", lane);
117 // }
118 mReceivedLanes.set(lane);
119 }
120
121 if (mReceivedLanes.count() == mLanesToExpect) {
122 LOGP(info, "data of all lanes received");
123 if (mPublishAfterComplete) {
124 LOGP(info, "publishing after all data was received");
125 sendOutput(pc.outputs());
126
127 // reset calibration objects
128 mMergedCalDetsMap.clear();
129 for (auto& [type, object] : mMergedCalDets) {
130 object = 0;
131 }
132 }
133 mReceivedLanes.reset();
134 }
135 }
136
138 {
139 LOGP(info, "endOfStream");
140
141 if (mReceivedLanes.count() == mLanesToExpect) {
142 sendOutput(ec.outputs());
143 } else {
144 LOGP(info, "Received lanes {} does not match expected lanes {}, object already sent", mReceivedLanes.count(), mLanesToExpect);
145 }
146 ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
147 }
148
149 private:
150 using dataType = o2::tpc::CalDet<float>;
151 using CalDetMap = std::unordered_map<std::string, dataType>;
152 std::bitset<128> mReceivedLanes;
153 std::unordered_map<int, dataType> mMergedCalDets;
154 std::vector<CalibRawPartInfo> mCalibInfos;
155 CalDetMap mMergedCalDetsMap;
156 CDBType mCalDetMapType;
157 uint64_t mRunNumber{0};
158 uint32_t mLanesToExpect{0};
159 uint32_t mDCSSpecOffset{32768};
160 bool mForceQuit{false};
161 bool mDirectFileDump{false};
162 bool mPublishAfterComplete{false};
163 bool mSkipCCDB{false};
164 bool mSendToDCS{false};
165 bool mCheckCalibInfos{false};
166
167 //____________________________________________________________________________
168 void sendOutput(DataAllocator& output)
169 {
170 if (mCheckCalibInfos) {
171 if (std::adjacent_find(mCalibInfos.begin(), mCalibInfos.end(), std::not_equal_to<>()) != mCalibInfos.end()) {
172 LOGP(warning, "Different calib info found");
173 }
174 }
175
176 // perhaps should be changed to time of the run
177 const long timeStart = mCalibInfos[0].tfIDInfo.creation + mCalibInfos[0].publishCycle;
179
180 std::map<std::string, std::string> md;
181
182 if (mMergedCalDetsMap.size() > 0) {
184 auto image = o2::ccdb::CcdbApi::createObjectImage(&mMergedCalDetsMap, &w);
185
186 w.setPath(CDBTypeMap.at(mCalDetMapType));
187 w.setStartValidityTimestamp(timeStart);
188 w.setEndValidityTimestamp(timeEnd);
189
190 md = w.getMetaData();
191 md[o2::base::NameConf::CCDBRunTag.data()] = std::to_string(mRunNumber);
192 w.setMetaData(md);
193
194 LOGP(info, "Sending object {}/{} of size {} bytes, valid for {} : {}", w.getPath(), w.getFileName(), image->size(), w.getStartValidityTimestamp(), w.getEndValidityTimestamp());
195
197 output.snapshot(Output{clbUtils::gDataOriginCDBPayload, "TPC_CALIB", subSpec}, *image.get());
198 output.snapshot(Output{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB", subSpec}, w);
199
200 // for pedestal calibration send to DCS if requested
201 if (mSendToDCS && (mCalDetMapType == CDBType::CalPedestalNoise)) {
202 sendPedestalNoiseToDCS(output);
203 }
204 }
205
206 for (auto& [type, object] : mMergedCalDets) {
209
210 w.setPath(CDBTypeMap.at(CDBType(type)));
211 w.setStartValidityTimestamp(timeStart);
212 w.setEndValidityTimestamp(timeEnd);
213
214 md = w.getMetaData();
215 md[o2::base::NameConf::CCDBRunTag.data()] = std::to_string(mRunNumber);
216 w.setMetaData(md);
217
218 LOG(info) << "Sending object " << w.getPath() << "/" << w.getFileName() << " of size " << image->size()
219 << " bytes, valid for " << w.getStartValidityTimestamp() << " : " << w.getEndValidityTimestamp();
220
222 output.snapshot(Output{clbUtils::gDataOriginCDBPayload, "TPC_CALIB", subSpec}, *image.get());
223 output.snapshot(Output{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB", subSpec}, w);
224 }
225
226 dumpCalibData();
227 }
228
229 //____________________________________________________________________________
230 void dumpCalibData()
231 {
232 if (mDirectFileDump) {
233 LOGP(info, "Dumping output to file");
234 std::string fileName = "merged_CalDet.root";
235 if (mMergedCalDetsMap.size()) {
236 const auto& cdbType = CDBTypeMap.at(mCalDetMapType);
237 const auto name = cdbType.substr(cdbType.rfind("/") + 1);
238 fileName = fmt::format("merged_{}_{}_{}.root", name, mCalibInfos[0].tfIDInfo.tfCounter, mCalibInfos[0].publishCycle);
239 }
240 TFile f(fileName.data(), "recreate");
241 for (auto& [key, object] : mMergedCalDetsMap) {
242 f.WriteObject(&object, object.getName().data());
243 }
244 for (auto& [type, object] : mMergedCalDets) {
245 f.WriteObject(&object, object.getName().data());
246 }
247 }
248 }
249
250 void sendPedestalNoiseToDCS(DataAllocator& output)
251 {
252 auto sendObject = [this, &output](const CalPad& data, const std::string& path, const std::string& fileNameBase = "") {
253 const long timeStart = mCalibInfos[0].tfIDInfo.creation + mCalibInfos[0].publishCycle;
255
256 const auto dataMap = cru_calib_helpers::getDataMap(data);
257 std::ostringstream dataStr;
258 cru_calib_helpers::writeValues(dataStr, dataMap);
259
260 std::vector<char> dataVec;
261 const auto& str = dataStr.str();
262 std::copy(str.begin(), str.end(), std::back_inserter(dataVec));
263
265
266 w.setPath(path);
267 w.setFileName(fmt::format("{}_{}_{}.txt", fileNameBase, mRunNumber, timeStart));
268 w.setStartValidityTimestamp(timeStart);
269 w.setEndValidityTimestamp(timeEnd);
270
271 auto md = w.getMetaData();
272 md[o2::base::NameConf::CCDBRunTag.data()] = std::to_string(mRunNumber);
273 w.setMetaData(md);
274
275 LOGP(info, "Sending object to DCS DB {}/{} of size {} ({}) bytes, valid for {} : {}", w.getPath(), w.getFileName(), dataVec.size(), dataStr.str().size(), w.getStartValidityTimestamp(), w.getEndValidityTimestamp());
276
278 output.snapshot(Output{clbUtils::gDataOriginCDBPayload, "TPC_CALIB_DCS", subSpec}, dataVec);
279 output.snapshot(Output{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB_DCS", subSpec}, w);
280 };
281
282 const auto& pedestals = mMergedCalDetsMap.at("Pedestals");
283 const auto& noise = mMergedCalDetsMap.at("Noise");
284
285 bool first = true;
286 for (auto threshold : {2.5f, 3.f, 3.5f}) {
287 auto pedestalsThreshold = cru_calib_helpers::preparePedestalFiles(pedestals, noise, {threshold});
288
289 // pedestals don't depend on threshold, publish on first iteration only
290 if (first) {
291 const auto& pedestalsPhys = pedestalsThreshold["PedestalsPhys"];
292 sendObject(pedestalsPhys, "TPC/Calib/PedestalsPhys", "Pedestals");
293 }
294
295 const auto& thresholdsPhys = pedestalsThreshold["ThresholdMapPhys"];
296 const auto fileNameBase = fmt::format("ThresholdsPhys-{:.0f}", threshold * 10);
297 sendObject(thresholdsPhys, "TPC/Calib/" + fileNameBase, fileNameBase);
298
299 first = false;
300 }
301 }
302};
303
304o2::framework::DataProcessorSpec o2::tpc::getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool sendToDCS, bool dumpAfterComplete)
305{
306 std::vector<OutputSpec> outputs;
307 if (!skipCCDB) {
308 outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCDBPayload, "TPC_CALIB"}, Lifetime::Sporadic);
309 outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB"}, Lifetime::Sporadic);
310 }
311
312 if (sendToDCS) {
313 outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCDBPayload, "TPC_CALIB_DCS"}, Lifetime::Sporadic);
314 outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB_DCS"}, Lifetime::Sporadic);
315 }
316
317 std::vector<InputSpec> inputs;
318 inputs.emplace_back("clbPayload", ConcreteDataTypeMatcher{gDataOriginTPC, "CLBPART"}, Lifetime::Sporadic);
319 inputs.emplace_back("clbInfo", ConcreteDataTypeMatcher{gDataOriginTPC, "CLBPARTINFO"}, Lifetime::Sporadic);
320
321 const std::string id = "calib-tpc-caldet-merger-publisher";
322
323 return DataProcessorSpec{
324 id.data(),
325 inputs,
326 outputs,
327 AlgorithmSpec{adaptFromTask<CalDetMergerPublisherSpec>(lanes, skipCCDB, sendToDCS, dumpAfterComplete)},
328 Options{
329 {"force-quit", VariantType::Bool, false, {"force quit after max-events have been reached"}},
330 {"direct-file-dump", VariantType::Bool, false, {"directly dump calibration to file"}},
331 {"check-calib-infos", VariantType::Bool, false, {"make consistency check of calib infos"}},
332 } // end Options
333 }; // end DataProcessorSpec
334}
Simple interface to the CDB manager.
TPC CalDet merger and CCDB publisher.
std::string getName(const TDataMember *dm, int index, int size)
Utils and constants for calibration and related workflows.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Definition of the Names Generator class.
StringRef key
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2::framework::ProcessingContext &pc) final
CalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool sendToDCS, bool dumpAfterComplete=false)
static constexpr std::string_view CCDBRunTag
Definition NameConf.h:69
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
void setPath(const std::string &path)
static constexpr long INFINITE_TIMESTAMP
GLeglImageOES image
Definition glcorearb.h:4021
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::unordered_map< std::string, CalPad > preparePedestalFiles(const CalPad &pedestals, const CalPad &noise, std::vector< float > sigmaNoiseROCType={3, 3, 3, 3}, std::vector< float > minADCROCType={2, 2, 2, 2}, float pedestalOffset=0, bool onlyFilled=false, bool maskBad=true, float noisyChannelThreshold=1.5, float sigmaNoiseNoisyChannels=4, float badChannelThreshold=6, bool fixedSize=false)
DataMapU32 getDataMap(const CalPad &calPad)
void writeValues(std::ostream &str, const DataMap &map, bool onlyFilled=false)
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:167
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:95
o2::framework::DataProcessorSpec getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool sendToDCS, bool dumpAfterComplete=false)
CDBType
Calibration and parameter types for CCDB.
Definition CDBTypes.h:26
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str