18#include <unordered_map>
23#include <fmt/format.h>
55 CalDetMergerPublisherSpec(uint32_t lanes,
bool skipCCDB,
bool dumpAfterComplete =
false) : mLanesToExpect(lanes), mCalibInfos(lanes), mSkipCCDB(skipCCDB), mPublishAfterComplete(dumpAfterComplete) {}
59 mForceQuit = ic.options().get<
bool>(
"force-quit");
60 mDirectFileDump = ic.options().get<
bool>(
"direct-file-dump");
61 mCheckCalibInfos = ic.options().get<
bool>(
"check-calib-infos");
66 int nSlots = pc.inputs().getNofParts(0);
67 assert(pc.inputs().getNofParts(1) == nSlots);
71 for (
int isl = 0; isl < nSlots; isl++) {
74 const auto pld = pc.inputs().get<gsl::span<char>>(
"clbPayload", isl);
75 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().get(
"clbInfo", isl));
76 const auto subSpec = dh->subSpecification;
77 const int lane = subSpec >> 4;
78 const int calibType = subSpec & 0xf;
79 mCalibInfos[lane] = calibInfo;
82 TMemFile
f(
"file", (
char*)&pld[0], pld.size(),
"READ");
84 auto calDetMap =
f.Get<CalDetMap>(
"data");
86 if (mMergedCalDetsMap.size() == 0) {
88 for (
auto& [
key, obj] : *calDetMap) {
89 mMergedCalDetsMap[
key] = obj;
92 if (
int(mCalDetMapType) !=
type) {
95 for (
auto& [
key, obj] : *calDetMap) {
96 mMergedCalDetsMap[
key] += obj;
103 if (mMergedCalDets.find(
type) == mMergedCalDets.end()) {
104 mMergedCalDets[
type] = *calDet;
106 mMergedCalDets[
type] += *calDet;
112 LOGP(info,
"getting slot {}, subspec {:#8x}, lane {}, type {} ({}), firstTF {}, cycle {}", isl, subSpec, lane, calibType,
type, calibInfo.tfIDInfo.tfCounter, calibInfo.publishCycle);
116 mReceivedLanes.set(lane);
119 if (mReceivedLanes.count() == mLanesToExpect) {
120 LOGP(info,
"data of all lanes received");
121 if (mPublishAfterComplete) {
122 LOGP(info,
"publishing after all data was received");
123 sendOutput(pc.outputs());
126 mMergedCalDetsMap.clear();
127 for (
auto& [
type,
object] : mMergedCalDets) {
131 mReceivedLanes.reset();
137 LOGP(info,
"endOfStream");
139 if (mReceivedLanes.count() == mLanesToExpect) {
140 sendOutput(ec.outputs());
142 LOGP(info,
"Received lanes {} does not match expected lanes {}, object already sent", mReceivedLanes.count(), mLanesToExpect);
149 using CalDetMap = std::unordered_map<std::string, dataType>;
150 std::bitset<128> mReceivedLanes;
151 std::unordered_map<int, dataType> mMergedCalDets;
152 std::vector<CalibRawPartInfo> mCalibInfos;
153 CalDetMap mMergedCalDetsMap;
155 uint64_t mRunNumber{0};
156 uint32_t mLanesToExpect{0};
157 bool mForceQuit{
false};
158 bool mDirectFileDump{
false};
159 bool mPublishAfterComplete{
false};
160 bool mSkipCCDB{
false};
161 bool mCheckCalibInfos{
false};
166 if (mCheckCalibInfos) {
167 if (std::adjacent_find(mCalibInfos.begin(), mCalibInfos.end(), std::not_equal_to<>()) != mCalibInfos.end()) {
168 LOGP(warning,
"Different calib info found");
173 const auto now = std::chrono::system_clock::now();
174 const long timeStart = mCalibInfos[0].tfIDInfo.creation + mCalibInfos[0].publishCycle;
177 std::map<std::string, std::string> md;
179 if (mMergedCalDetsMap.size() > 0) {
184 w.setStartValidityTimestamp(timeStart);
185 w.setEndValidityTimestamp(timeEnd);
187 md =
w.getMetaData();
191 LOGP(info,
"Sending object {}/{} of size {} bytes, valid for {} : {}",
w.getPath(),
w.getFileName(),
image->size(),
w.getStartValidityTimestamp(),
w.getEndValidityTimestamp());
198 for (
auto& [
type,
object] : mMergedCalDets) {
203 w.setStartValidityTimestamp(timeStart);
204 w.setEndValidityTimestamp(timeEnd);
206 md =
w.getMetaData();
210 LOG(info) <<
"Sending object " <<
w.getPath() <<
"/" <<
w.getFileName() <<
" of size " <<
image->size()
211 <<
" bytes, valid for " <<
w.getStartValidityTimestamp() <<
" : " <<
w.getEndValidityTimestamp();
224 if (mDirectFileDump) {
225 LOGP(info,
"Dumping output to file");
226 std::string fileName =
"merged_CalDet.root";
227 if (mMergedCalDetsMap.size()) {
228 const auto& cdbType =
CDBTypeMap.at(mCalDetMapType);
229 const auto name = cdbType.substr(cdbType.rfind(
"/") + 1);
230 fileName = fmt::format(
"merged_{}_{}_{}.root",
name, mCalibInfos[0].tfIDInfo.tfCounter, mCalibInfos[0].publishCycle);
232 TFile
f(fileName.data(),
"recreate");
233 for (
auto& [
key,
object] : mMergedCalDetsMap) {
236 for (
auto& [
type,
object] : mMergedCalDets) {
245 std::vector<OutputSpec> outputs;
251 std::vector<InputSpec> inputs;
255 const std::string
id =
"calib-tpc-caldet-merger-publisher";
261 AlgorithmSpec{adaptFromTask<CalDetMergerPublisherSpec>(lanes, skipCCDB, dumpAfterComplete)},
263 {
"force-quit", VariantType::Bool,
false, {
"force quit after max-events have been reached"}},
264 {
"direct-file-dump", VariantType::Bool,
false, {
"directly dump calibration to file"}},
265 {
"check-calib-infos", VariantType::Bool,
false, {
"make consistency check of calib infos"}},
Simple interface to the CDB manager.
TPC CalDet merger and CCDB publisher.
std::string getName(const TDataMember *dm, int index, int size)
Utils and constants for calibration and related workflows.
Definition of the Names Generator class.
CalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete=false)
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2::framework::ProcessingContext &pc) final
static constexpr std::string_view CCDBRunTag
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static constexpr long INFINITE_TIMESTAMP
GLuint const GLchar * name
GLint GLint GLsizei GLint GLenum GLenum type
GLubyte GLubyte GLubyte GLubyte w
constexpr o2::header::DataOrigin gDataOriginTPC
Defining PrimaryVertex explicitly as messageable.
std::vector< ConfigParamSpec > Options
uint64_t getRunNumber(o2::framework::ProcessingContext &pc)
Global TPC definitions and constants.
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
o2::framework::DataProcessorSpec getCalDetMergerPublisherSpec(uint32_t lanes, bool skipCCDB, bool dumpAfterComplete=false)
CDBType
Calibration and parameter types for CCDB.
std::string to_string(gsl::span< T, Size > span)
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"