57 CalDetMergerPublisherSpec(uint32_t lanes,
bool skipCCDB,
bool sendToDCS,
bool dumpAfterComplete =
false) : mLanesToExpect(lanes), mCalibInfos(lanes), mSkipCCDB(skipCCDB), mSendToDCS(sendToDCS), mPublishAfterComplete(dumpAfterComplete) {}
61 mForceQuit = ic.options().get<
bool>(
"force-quit");
62 mDirectFileDump = ic.options().get<
bool>(
"direct-file-dump");
63 mCheckCalibInfos = ic.options().get<
bool>(
"check-calib-infos");
68 int nSlots = pc.inputs().getNofParts(0);
69 assert(pc.inputs().getNofParts(1) == nSlots);
73 for (
int isl = 0; isl < nSlots; isl++) {
76 const auto pld = pc.inputs().get<gsl::span<char>>(
"clbPayload", isl);
77 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().get(
"clbInfo", isl));
78 const auto subSpec = dh->subSpecification;
79 const int lane = subSpec >> 4;
80 const int calibType = subSpec & 0xf;
81 mCalibInfos[lane] = calibInfo;
84 TMemFile
f(
"file", (
char*)&pld[0], pld.size(),
"READ");
86 auto calDetMap =
f.Get<CalDetMap>(
"data");
88 if (mMergedCalDetsMap.size() == 0) {
90 for (
auto& [
key, obj] : *calDetMap) {
91 mMergedCalDetsMap[
key] = obj;
94 if (
int(mCalDetMapType) !=
type) {
97 for (
auto& [
key, obj] : *calDetMap) {
98 mMergedCalDetsMap[
key] += obj;
105 if (mMergedCalDets.find(
type) == mMergedCalDets.end()) {
106 mMergedCalDets[
type] = *calDet;
108 mMergedCalDets[
type] += *calDet;
114 LOGP(info,
"getting slot {}, subspec {:#8x}, lane {}, type {} ({}), firstTF {}, cycle {}", isl, subSpec, lane, calibType,
type, calibInfo.tfIDInfo.tfCounter, calibInfo.publishCycle);
118 mReceivedLanes.set(lane);
121 if (mReceivedLanes.count() == mLanesToExpect) {
122 LOGP(info,
"data of all lanes received");
123 if (mPublishAfterComplete) {
124 LOGP(info,
"publishing after all data was received");
125 sendOutput(pc.outputs());
128 mMergedCalDetsMap.clear();
129 for (
auto& [
type,
object] : mMergedCalDets) {
133 mReceivedLanes.reset();
139 LOGP(info,
"endOfStream");
141 if (mReceivedLanes.count() == mLanesToExpect) {
142 sendOutput(ec.outputs());
144 LOGP(info,
"Received lanes {} does not match expected lanes {}, object already sent", mReceivedLanes.count(), mLanesToExpect);
151 using CalDetMap = std::unordered_map<std::string, dataType>;
152 std::bitset<128> mReceivedLanes;
153 std::unordered_map<int, dataType> mMergedCalDets;
154 std::vector<CalibRawPartInfo> mCalibInfos;
155 CalDetMap mMergedCalDetsMap;
157 uint64_t mRunNumber{0};
158 uint32_t mLanesToExpect{0};
159 uint32_t mDCSSpecOffset{32768};
160 bool mForceQuit{
false};
161 bool mDirectFileDump{
false};
162 bool mPublishAfterComplete{
false};
163 bool mSkipCCDB{
false};
164 bool mSendToDCS{
false};
165 bool mCheckCalibInfos{
false};
170 if (mCheckCalibInfos) {
171 if (std::adjacent_find(mCalibInfos.begin(), mCalibInfos.end(), std::not_equal_to<>()) != mCalibInfos.end()) {
172 LOGP(warning,
"Different calib info found");
177 const long timeStart = mCalibInfos[0].tfIDInfo.creation + mCalibInfos[0].publishCycle;
180 std::map<std::string, std::string> md;
182 if (mMergedCalDetsMap.size() > 0) {
187 w.setStartValidityTimestamp(timeStart);
188 w.setEndValidityTimestamp(timeEnd);
190 md =
w.getMetaData();
194 LOGP(info,
"Sending object {}/{} of size {} bytes, valid for {} : {}",
w.getPath(),
w.getFileName(),
image->size(),
w.getStartValidityTimestamp(),
w.getEndValidityTimestamp());
201 if (mSendToDCS && (mCalDetMapType == CDBType::CalPedestalNoise)) {
202 sendPedestalNoiseToDCS(
output);
206 for (
auto& [
type,
object] : mMergedCalDets) {
211 w.setStartValidityTimestamp(timeStart);
212 w.setEndValidityTimestamp(timeEnd);
214 md =
w.getMetaData();
218 LOG(info) <<
"Sending object " <<
w.getPath() <<
"/" <<
w.getFileName() <<
" of size " <<
image->size()
219 <<
" bytes, valid for " <<
w.getStartValidityTimestamp() <<
" : " <<
w.getEndValidityTimestamp();
232 if (mDirectFileDump) {
233 LOGP(info,
"Dumping output to file");
234 std::string fileName =
"merged_CalDet.root";
235 if (mMergedCalDetsMap.size()) {
236 const auto& cdbType =
CDBTypeMap.at(mCalDetMapType);
237 const auto name = cdbType.substr(cdbType.rfind(
"/") + 1);
238 fileName = fmt::format(
"merged_{}_{}_{}.root",
name, mCalibInfos[0].tfIDInfo.tfCounter, mCalibInfos[0].publishCycle);
240 TFile
f(fileName.data(),
"recreate");
241 for (
auto& [
key,
object] : mMergedCalDetsMap) {
244 for (
auto& [
type,
object] : mMergedCalDets) {
252 auto sendObject = [
this, &
output](
const CalPad&
data,
const std::string&
path,
const std::string& fileNameBase =
"") {
253 const long timeStart = mCalibInfos[0].tfIDInfo.creation + mCalibInfos[0].publishCycle;
257 std::ostringstream dataStr;
260 std::vector<char> dataVec;
261 const auto&
str = dataStr.str();
262 std::copy(
str.begin(),
str.end(), std::back_inserter(dataVec));
267 w.setFileName(fmt::format(
"{}_{}_{}.txt", fileNameBase, mRunNumber, timeStart));
268 w.setStartValidityTimestamp(timeStart);
269 w.setEndValidityTimestamp(timeEnd);
271 auto md =
w.getMetaData();
275 LOGP(info,
"Sending object to DCS DB {}/{} of size {} ({}) bytes, valid for {} : {}",
w.getPath(),
w.getFileName(), dataVec.size(), dataStr.str().size(),
w.getStartValidityTimestamp(),
w.getEndValidityTimestamp());
282 const auto& pedestals = mMergedCalDetsMap.at(
"Pedestals");
283 const auto& noise = mMergedCalDetsMap.at(
"Noise");
286 for (
auto threshold : {2.5f, 3.f, 3.5f}) {
291 const auto& pedestalsPhys = pedestalsThreshold[
"PedestalsPhys"];
292 sendObject(pedestalsPhys,
"TPC/Calib/PedestalsPhys",
"Pedestals");
295 const auto& thresholdsPhys = pedestalsThreshold[
"ThresholdMapPhys"];
296 const auto fileNameBase = fmt::format(
"ThresholdsPhys-{:.0f}", threshold * 10);
297 sendObject(thresholdsPhys,
"TPC/Calib/" + fileNameBase, fileNameBase);
std::unordered_map< std::string, CalPad > preparePedestalFiles(const CalPad &pedestals, const CalPad &noise, std::vector< float > sigmaNoiseROCType={3, 3, 3, 3}, std::vector< float > minADCROCType={2, 2, 2, 2}, float pedestalOffset=0, bool onlyFilled=false, bool maskBad=true, float noisyChannelThreshold=1.5, float sigmaNoiseNoisyChannels=4, float badChannelThreshold=6, bool fixedSize=false)