12#ifndef O2_TPC_RESIDUALAGGREGATORSPEC_H
13#define O2_TPC_RESIDUALAGGREGATORSPEC_H
30#include <fairmq/Device.h>
44 ResidualAggregatorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req,
bool trackInput,
bool ctpInput,
bool writeUnbinnedResiduals,
bool writeBinnedResiduals,
bool writeTrackData, std::shared_ptr<o2::globaltracking::DataRequest> dataRequest) : mCCDBRequest(req), mTrackInput(trackInput), mCTPInput(ctpInput), mWriteUnbinnedResiduals(writeUnbinnedResiduals), mWriteBinnedResiduals(writeBinnedResiduals), mWriteTrackData(writeTrackData), mDataRequest(dataRequest) {}
49 int minEnt = ic.options().get<
int>(
"min-entries");
50 auto slotLength = ic.options().get<uint32_t>(
"sec-per-slot");
51 bool useInfiniteSlotLength =
false;
52 if (slotLength == 0) {
53 useInfiniteSlotLength =
true;
55 auto updateInterval = ic.options().get<uint32_t>(
"updateInterval");
56 auto delay = ic.options().get<uint32_t>(
"max-delay");
59 bool storeMetaFile =
false;
60 std::string metaFileDir = ic.options().get<std::string>(
"meta-output-dir");
61 if (metaFileDir !=
"/dev/null") {
68 bool writeOutput =
true;
69 std::string outputDir = ic.options().get<std::string>(
"output-dir");
70 if (outputDir !=
"/dev/null") {
74 storeMetaFile =
false;
77 LOGP(info,
"Creating aggregator with {} entries per voxel minimum. Output file writing enabled: {}, meta file writing enabled: {}",
78 minEnt, writeOutput, storeMetaFile);
79 mAggregator = std::make_unique<o2::tpc::ResidualAggregator>(minEnt);
81 mAggregator->setOutputDir(outputDir);
84 mAggregator->setMetaFileOutputDir(metaFileDir);
87 int autosave = ic.options().get<
int>(
"autosave-interval");
88 mAggregator->setAutosaveInterval(autosave);
90 mAggregator->setMaxSlotsDelay(delay);
91 if (useInfiniteSlotLength) {
93 mAggregator->setCheckIntervalInfiniteSlot(updateInterval);
95 mAggregator->setSlotLengthInSeconds(slotLength);
97 mAggregator->setWriteBinnedResiduals(mWriteBinnedResiduals);
98 mAggregator->setWriteUnbinnedResiduals(mWriteUnbinnedResiduals);
99 mAggregator->setWriteTrackData(mWriteTrackData);
100 mAggregator->setCompression(ic.options().get<
int>(
"compression"));
116 if (tinfo.globalRunNumberChanged) {
118 LOG(info) <<
"New run start detected";
119 mRunStopRequested =
false;
122 if (mRunStopRequested) {
125 auto runStartTime = std::chrono::high_resolution_clock::now();
128 updateTimeDependentParams(pc);
129 std::chrono::duration<double, std::milli> ccdbUpdateTime = std::chrono::high_resolution_clock::now() - runStartTime;
132 auto residualsData = pc.inputs().get<gsl::span<o2::tpc::UnbinnedResid>>(
"unbinnedRes");
133 auto trackRefs = pc.inputs().get<gsl::span<o2::tpc::TrackDataCompact>>(
"trackRefs");
136 const gsl::span<const o2::tpc::TrackData>* trkDataPtr =
nullptr;
137 using trkDataType = std::decay_t<
decltype(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(
""))>;
138 std::optional<trkDataType> trkData;
140 trkData.emplace(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(
"trkData"));
141 trkDataPtr = &trkData.value();
145 using lumiDataType = std::decay_t<
decltype(pc.inputs().get<
o2::ctp::LumiInfo>(
""))>;
146 std::optional<lumiDataType> lumiInput;
149 lumi = &lumiInput.value();
153 LOG(detail) <<
"Processing TF " << mAggregator->getCurrentTFInfo().tfCounter <<
" with " << trkData->size() <<
" tracks and " << residualsData.size() <<
" unbinned residuals associated to them";
154 mAggregator->process(residualsData, trackRefs, trkDataPtr,
lumi);
155 std::chrono::duration<double, std::milli> runDuration = std::chrono::high_resolution_clock::now() - runStartTime;
156 LOGP(
debug,
"Duration for run method: {} ms. From this taken for time dependent param update: {} ms",
157 std::chrono::duration_cast<std::chrono::milliseconds>(runDuration).
count(),
158 std::chrono::duration_cast<std::chrono::milliseconds>(ccdbUpdateTime).
count());
159 if (pc.transitionState() == TransitionHandlingState::Requested) {
160 LOG(info) <<
"Run stop requested, finalizing";
161 mRunStopRequested =
true;
162 mAggregator->checkSlotsToFinalize();
164 LOG(info) <<
"Finalizing done";
170 if (mRunStopRequested) {
173 LOG(info) <<
"Finalizing calibration for end of stream";
174 mAggregator->checkSlotsToFinalize();
176 LOG(info) <<
"Finalizing done for end of stream";
185 LOGP(info,
"Updating TPC fast transform map with new VDrift factor of {} wrt reference {} and DriftTimeOffset correction {} wrt {} from source {}",
199 std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
200 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
201 std::shared_ptr<o2::globaltracking::DataRequest> mDataRequest;
202 bool mTrackInput{
false};
203 bool mCTPInput{
false};
204 bool mWriteBinnedResiduals{
false};
205 bool mWriteUnbinnedResiduals{
false};
206 bool mWriteTrackData{
false};
207 bool mRunStopRequested{
false};
208 bool mInitDone{
false};
216DataProcessorSpec getTPCResidualAggregatorSpec(
bool trackInput,
bool ctpInput,
bool writeUnbinnedResiduals,
bool writeBinnedResiduals,
bool writeTrackData)
218 std::shared_ptr<o2::globaltracking::DataRequest> dataRequest = std::make_shared<o2::globaltracking::DataRequest>();
222 auto& inputs = dataRequest->inputs;
224 inputs.emplace_back(
"unbinnedRes",
"GLO",
"UNBINNEDRES");
225 inputs.emplace_back(
"trackRefs",
"GLO",
"TRKREFS");
227 inputs.emplace_back(
"trkData",
"GLO",
"TRKDATA");
229 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(
true,
238 "residual-aggregator",
241 AlgorithmSpec{adaptFromTask<o2::calibration::ResidualAggregatorDevice>(ccdbRequest, trackInput, ctpInput, writeUnbinnedResiduals, writeBinnedResiduals, writeTrackData, dataRequest)},
243 {
"sec-per-slot",
VariantType::UInt32, 600u, {
"number of seconds per calibration time slot (put 0 for infinite slot length)"}},
244 {
"updateInterval",
VariantType::UInt32, 6'000u, {
"update interval in number of TFs (only used in case slot length is infinite)"}},
246 {
"min-entries",
VariantType::Int, 0, {
"minimum number of entries on average per voxel"}},
247 {
"compression",
VariantType::Int, 505, {
"ROOT compression setting for output file (see TFile documentation for meaning of this number)"}},
248 {
"output-dir",
VariantType::String,
"none", {
"Output directory for residuals. Defaults to current working directory. Output is disabled in case set to /dev/null"}},
249 {
"meta-output-dir",
VariantType::String,
"/dev/null", {
"Residuals metadata output directory, must exist (if not /dev/null)"}},
250 {
"autosave-interval",
VariantType::Int, 0, {
"Write output to file for every n-th TF. 0 means this feature is OFF"}}}};
Utils and constants for calibration and related workflows.
Helper for geometry and GRP related CCDB requests.
Collects local TPC cluster residuals from EPNs.
Definition of the TrackResiduals class.
Helper class to extract VDrift from different sources.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
ResidualAggregatorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr< o2::globaltracking::DataRequest > dataRequest)
ServiceRegistryRef services()
The services registry associated with this processing context.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
static std::string_view getSourceName(Source s)
constexpr TFType INFINITE_TF
Defining PrimaryVertex explicitly as messageable.
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
const o2::ctp::LumiInfo & getCTPLumi() const
float refTimeOffset
additive time offset reference (\mus)
float refVDrift
reference vdrift for which factor was extracted
float timeOffsetCorr
additive time offset correction (\mus)
float corrFact
drift velocity correction factor (multiplicative)
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"