Project
Loading...
Searching...
No Matches
CalDetMergerPublisherSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
17#include <bitset>
18#include <unordered_map>
19#include <vector>
20#include <string>
21#include <algorithm>
22
23#include <fmt/format.h>
24
25#include "TMemFile.h"
26#include "TFile.h"
27
28#include "Framework/Task.h"
30#include "Framework/Logger.h"
33
34#include "Headers/DataHeader.h"
36#include "CCDB/CcdbApi.h"
37#include "CCDB/CcdbObjectInfo.h"
39#include "TPCBase/CalDet.h"
44
45using namespace o2::framework;
46using namespace o2::tpc;
49
51{
53
54 public:
55 CalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete = false) : mLanesToExpect(lanes), mCalibInfos(lanes), mSkipCCDB(skipCCDB), mPublishAfterComplete(dumpAfterComplete) {}
56
58 {
59 mForceQuit = ic.options().get<bool>("force-quit");
60 mDirectFileDump = ic.options().get<bool>("direct-file-dump");
61 mCheckCalibInfos = ic.options().get<bool>("check-calib-infos");
62 }
63
65 {
66 int nSlots = pc.inputs().getNofParts(0);
67 assert(pc.inputs().getNofParts(1) == nSlots);
68
69 mRunNumber = processing_helpers::getRunNumber(pc);
70
71 for (int isl = 0; isl < nSlots; isl++) {
72 const auto calibInfo = pc.inputs().get<CalibRawPartInfo>("clbInfo", isl);
73 const auto type = calibInfo.calibType;
74 const auto pld = pc.inputs().get<gsl::span<char>>("clbPayload", isl); // this is actually an image of TMemFile
75 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().get("clbInfo", isl));
76 const auto subSpec = dh->subSpecification;
77 const int lane = subSpec >> 4;
78 const int calibType = subSpec & 0xf;
79 mCalibInfos[lane] = calibInfo;
80
81 // const auto& path = wrp->getPath();
82 TMemFile f("file", (char*)&pld[0], pld.size(), "READ");
83 if (!f.IsZombie()) {
84 auto calDetMap = f.Get<CalDetMap>("data");
85 if (calDetMap) {
86 if (mMergedCalDetsMap.size() == 0) {
87 mCalDetMapType = CDBType(type);
88 for (auto& [key, obj] : *calDetMap) {
89 mMergedCalDetsMap[key] = obj;
90 }
91 } else {
92 if (int(mCalDetMapType) != type) {
93 LOGP(fatal, "received CalDetMap of different type for merging, previous: {}, present{}", CDBTypeMap.at(mCalDetMapType), CDBTypeMap.at(CDBType(type)));
94 }
95 for (auto& [key, obj] : *calDetMap) {
96 mMergedCalDetsMap[key] += obj;
97 }
98 }
99 }
100
101 auto calDet = f.Get<o2::tpc::CalDet<float>>("data");
102 if (calDet) {
103 if (mMergedCalDets.find(type) == mMergedCalDets.end()) {
104 mMergedCalDets[type] = *calDet;
105 } else {
106 mMergedCalDets[type] += *calDet;
107 }
108 }
109 }
110 f.Close();
111
112 LOGP(info, "getting slot {}, subspec {:#8x}, lane {}, type {} ({}), firstTF {}, cycle {}", isl, subSpec, lane, calibType, type, calibInfo.tfIDInfo.tfCounter, calibInfo.publishCycle);
113 // if (mReceivedLanes.test(lane)) {
114 // LOGP(warning, "lane {} received multiple times", lane);
115 // }
116 mReceivedLanes.set(lane);
117 }
118
119 if (mReceivedLanes.count() == mLanesToExpect) {
120 LOGP(info, "data of all lanes received");
121 if (mPublishAfterComplete) {
122 LOGP(info, "publishing after all data was received");
123 sendOutput(pc.outputs());
124
125 // reset calibration objects
126 mMergedCalDetsMap.clear();
127 for (auto& [type, object] : mMergedCalDets) {
128 object = 0;
129 }
130 }
131 mReceivedLanes.reset();
132 }
133 }
134
136 {
137 LOGP(info, "endOfStream");
138
139 if (mReceivedLanes.count() == mLanesToExpect) {
140 sendOutput(ec.outputs());
141 } else {
142 LOGP(info, "Received lanes {} does not match expected lanes {}, object already sent", mReceivedLanes.count(), mLanesToExpect);
143 }
144 ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
145 }
146
147 private:
148 using dataType = o2::tpc::CalDet<float>;
149 using CalDetMap = std::unordered_map<std::string, dataType>;
150 std::bitset<128> mReceivedLanes;
151 std::unordered_map<int, dataType> mMergedCalDets;
152 std::vector<CalibRawPartInfo> mCalibInfos;
153 CalDetMap mMergedCalDetsMap;
154 CDBType mCalDetMapType;
155 uint64_t mRunNumber{0};
156 uint32_t mLanesToExpect{0};
157 bool mForceQuit{false};
158 bool mDirectFileDump{false};
159 bool mPublishAfterComplete{false};
160 bool mSkipCCDB{false};
161 bool mCheckCalibInfos{false};
162
163 //____________________________________________________________________________
164 void sendOutput(DataAllocator& output)
165 {
166 if (mCheckCalibInfos) {
167 if (std::adjacent_find(mCalibInfos.begin(), mCalibInfos.end(), std::not_equal_to<>()) != mCalibInfos.end()) {
168 LOGP(warning, "Different calib info found");
169 }
170 }
171
172 // perhaps should be changed to time of the run
173 const auto now = std::chrono::system_clock::now();
174 const long timeStart = mCalibInfos[0].tfIDInfo.creation + mCalibInfos[0].publishCycle;
176
177 std::map<std::string, std::string> md;
178
179 if (mMergedCalDetsMap.size() > 0) {
181 auto image = o2::ccdb::CcdbApi::createObjectImage(&mMergedCalDetsMap, &w);
182
183 w.setPath(CDBTypeMap.at(mCalDetMapType));
184 w.setStartValidityTimestamp(timeStart);
185 w.setEndValidityTimestamp(timeEnd);
186
187 md = w.getMetaData();
188 md[o2::base::NameConf::CCDBRunTag.data()] = std::to_string(mRunNumber);
189 w.setMetaData(md);
190
191 LOGP(info, "Sending object {}/{} of size {} bytes, valid for {} : {}", w.getPath(), w.getFileName(), image->size(), w.getStartValidityTimestamp(), w.getEndValidityTimestamp());
192
194 output.snapshot(Output{clbUtils::gDataOriginCDBPayload, "TPC_CALIB", subSpec}, *image.get());
195 output.snapshot(Output{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB", subSpec}, w);
196 }
197
198 for (auto& [type, object] : mMergedCalDets) {
201
202 w.setPath(CDBTypeMap.at(CDBType(type)));
203 w.setStartValidityTimestamp(timeStart);
204 w.setEndValidityTimestamp(timeEnd);
205
206 md = w.getMetaData();
207 md[o2::base::NameConf::CCDBRunTag.data()] = std::to_string(mRunNumber);
208 w.setMetaData(md);
209
210 LOG(info) << "Sending object " << w.getPath() << "/" << w.getFileName() << " of size " << image->size()
211 << " bytes, valid for " << w.getStartValidityTimestamp() << " : " << w.getEndValidityTimestamp();
212
214 output.snapshot(Output{clbUtils::gDataOriginCDBPayload, "TPC_CALIB", subSpec}, *image.get());
215 output.snapshot(Output{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB", subSpec}, w);
216 }
217
218 dumpCalibData();
219 }
220
221 //____________________________________________________________________________
222 void dumpCalibData()
223 {
224 if (mDirectFileDump) {
225 LOGP(info, "Dumping output to file");
226 std::string fileName = "merged_CalDet.root";
227 if (mMergedCalDetsMap.size()) {
228 const auto& cdbType = CDBTypeMap.at(mCalDetMapType);
229 const auto name = cdbType.substr(cdbType.rfind("/") + 1);
230 fileName = fmt::format("merged_{}_{}_{}.root", name, mCalibInfos[0].tfIDInfo.tfCounter, mCalibInfos[0].publishCycle);
231 }
232 TFile f(fileName.data(), "recreate");
233 for (auto& [key, object] : mMergedCalDetsMap) {
234 f.WriteObject(&object, object.getName().data());
235 }
236 for (auto& [type, object] : mMergedCalDets) {
237 f.WriteObject(&object, object.getName().data());
238 }
239 }
240 }
241};
242
243o2::framework::DataProcessorSpec o2::tpc::getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete)
244{
245 std::vector<OutputSpec> outputs;
246 if (!skipCCDB) {
247 outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCDBPayload, "TPC_CALIB"}, Lifetime::Sporadic);
248 outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCDBWrapper, "TPC_CALIB"}, Lifetime::Sporadic);
249 }
250
251 std::vector<InputSpec> inputs;
252 inputs.emplace_back("clbPayload", ConcreteDataTypeMatcher{gDataOriginTPC, "CLBPART"}, Lifetime::Sporadic);
253 inputs.emplace_back("clbInfo", ConcreteDataTypeMatcher{gDataOriginTPC, "CLBPARTINFO"}, Lifetime::Sporadic);
254
255 const std::string id = "calib-tpc-caldet-merger-publisher";
256
257 return DataProcessorSpec{
258 id.data(),
259 inputs,
260 outputs,
261 AlgorithmSpec{adaptFromTask<CalDetMergerPublisherSpec>(lanes, skipCCDB, dumpAfterComplete)},
262 Options{
263 {"force-quit", VariantType::Bool, false, {"force quit after max-events have been reached"}},
264 {"direct-file-dump", VariantType::Bool, false, {"directly dump calibration to file"}},
265 {"check-calib-infos", VariantType::Bool, false, {"make consistency check of calib infos"}},
266 } // end Options
267 }; // end DataProcessorSpec
268}
Simple interface to the CDB manager.
TPC CalDet merger and CCDB publisher.
std::string getName(const TDataMember *dm, int index, int size)
Utils and constants for calibration and related workflows.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Definition of the Names Generator class.
StringRef key
CalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete=false)
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2::framework::ProcessingContext &pc) final
static constexpr std::string_view CCDBRunTag
Definition NameConf.h:69
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
Definition CcdbApi.h:103
static constexpr long INFINITE_TIMESTAMP
GLeglImageOES image
Definition glcorearb.h:4021
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLubyte GLubyte GLubyte GLubyte w
Definition glcorearb.h:852
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
uint64_t now() noexcept
Definition Clock.h:69
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
Definition SimTraits.h:167
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
Definition CDBTypes.h:94
o2::framework::DataProcessorSpec getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete=false)
CDBType
Calibration and parameter types for CCDB.
Definition CDBTypes.h:26
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"