12#ifndef O2_TPC_RESIDUALAGGREGATORSPEC_H
13#define O2_TPC_RESIDUALAGGREGATORSPEC_H
30#include <fairmq/Device.h>
44 ResidualAggregatorDevice(std::shared_ptr<o2::base::GRPGeomRequest> req,
bool trackInput,
bool ctpInput,
bool writeUnbinnedResiduals,
bool writeBinnedResiduals,
bool writeTrackData, std::shared_ptr<o2::globaltracking::DataRequest> dataRequest) : mCCDBRequest(req), mTrackInput(trackInput), mCTPInput(ctpInput), mWriteUnbinnedResiduals(writeUnbinnedResiduals), mWriteBinnedResiduals(writeBinnedResiduals), mWriteTrackData(writeTrackData), mDataRequest(dataRequest) {}
49 int minEnt = ic.options().get<
int>(
"min-entries");
50 auto slotLength = ic.options().get<uint32_t>(
"sec-per-slot");
51 bool useInfiniteSlotLength =
false;
52 if (slotLength == 0) {
53 useInfiniteSlotLength =
true;
55 auto updateInterval = ic.options().get<uint32_t>(
"updateInterval");
56 auto delay = ic.options().get<uint32_t>(
"max-delay");
59 bool storeMetaFile =
false;
60 std::string metaFileDir = ic.options().get<std::string>(
"meta-output-dir");
61 if (metaFileDir !=
"/dev/null") {
68 bool writeOutput =
true;
69 std::string outputDir = ic.options().get<std::string>(
"output-dir");
70 if (outputDir !=
"/dev/null") {
74 storeMetaFile =
false;
77 LOGP(info,
"Creating aggregator with {} entries per voxel minimum. Output file writing enabled: {}, meta file writing enabled: {}",
78 minEnt, writeOutput, storeMetaFile);
79 mAggregator = std::make_unique<o2::tpc::ResidualAggregator>(minEnt);
81 mAggregator->setOutputDir(outputDir);
84 mAggregator->setMetaFileOutputDir(metaFileDir);
87 int autosave = ic.options().get<
int>(
"autosave-interval");
88 mAggregator->setAutosaveInterval(autosave);
90 mAggregator->setMaxSlotsDelay(delay);
91 if (useInfiniteSlotLength) {
93 mAggregator->setCheckIntervalInfiniteSlot(updateInterval);
95 mAggregator->setSlotLengthInSeconds(slotLength);
97 mAggregator->setWriteBinnedResiduals(mWriteBinnedResiduals);
98 mAggregator->setWriteUnbinnedResiduals(mWriteUnbinnedResiduals);
99 mAggregator->setWriteTrackData(mWriteTrackData);
100 mAggregator->setCompression(ic.options().get<
int>(
"compression"));
116 if (tinfo.globalRunNumberChanged) {
118 LOG(info) <<
"New run start detected";
119 mRunStopRequested =
false;
122 if (mRunStopRequested) {
125 auto runStartTime = std::chrono::high_resolution_clock::now();
128 updateTimeDependentParams(pc);
129 std::chrono::duration<double, std::milli> ccdbUpdateTime = std::chrono::high_resolution_clock::now() - runStartTime;
132 auto residualsData = pc.inputs().get<gsl::span<o2::tpc::UnbinnedResid>>(
"unbinnedRes");
133 auto trackRefs = pc.inputs().get<gsl::span<o2::tpc::TrackDataCompact>>(
"trackRefs");
136 const gsl::span<const o2::tpc::TrackData>* trkDataPtr =
nullptr;
137 using trkDataType = std::decay_t<
decltype(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(
""))>;
138 std::optional<trkDataType> trkData;
140 trkData.emplace(pc.inputs().get<gsl::span<o2::tpc::TrackData>>(
"trkData"));
141 trkDataPtr = &trkData.value();
145 using lumiDataType = std::decay_t<
decltype(pc.inputs().get<
o2::ctp::LumiInfo>(
""))>;
146 std::optional<lumiDataType> lumiInput;
150 lumi = &lumiInput.value();
154 LOG(detail) <<
"Processing TF " << mAggregator->getCurrentTFInfo().tfCounter <<
" with " << trkData->size() <<
" tracks and " << residualsData.size() <<
" unbinned residuals associated to them";
155 mAggregator->process(residualsData, trackRefs, trkDataPtr,
lumi);
156 std::chrono::duration<double, std::milli> runDuration = std::chrono::high_resolution_clock::now() - runStartTime;
157 LOGP(
debug,
"Duration for run method: {} ms. From this taken for time dependent param update: {} ms",
158 std::chrono::duration_cast<std::chrono::milliseconds>(runDuration).
count(),
159 std::chrono::duration_cast<std::chrono::milliseconds>(ccdbUpdateTime).
count());
160 if (pc.transitionState() == TransitionHandlingState::Requested) {
161 LOG(info) <<
"Run stop requested, finalizing";
162 mRunStopRequested =
true;
163 mAggregator->checkSlotsToFinalize();
165 LOG(info) <<
"Finalizing done";
171 if (mRunStopRequested) {
174 LOG(info) <<
"Finalizing calibration for end of stream";
175 mAggregator->checkSlotsToFinalize();
177 LOG(info) <<
"Finalizing done for end of stream";
186 LOGP(info,
"Updating TPC fast transform map with new VDrift factor of {} wrt reference {} and DriftTimeOffset correction {} wrt {} from source {}",
200 std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
201 std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
202 std::shared_ptr<o2::globaltracking::DataRequest> mDataRequest;
203 bool mTrackInput{
false};
204 bool mCTPInput{
false};
205 bool mWriteBinnedResiduals{
false};
206 bool mWriteUnbinnedResiduals{
false};
207 bool mWriteTrackData{
false};
208 bool mRunStopRequested{
false};
209 bool mInitDone{
false};
217DataProcessorSpec getTPCResidualAggregatorSpec(
bool trackInput,
bool ctpInput,
bool writeUnbinnedResiduals,
bool writeBinnedResiduals,
bool writeTrackData)
219 std::shared_ptr<o2::globaltracking::DataRequest> dataRequest = std::make_shared<o2::globaltracking::DataRequest>();
223 auto& inputs = dataRequest->inputs;
225 inputs.emplace_back(
"unbinnedRes",
"GLO",
"UNBINNEDRES");
226 inputs.emplace_back(
"trackRefs",
"GLO",
"TRKREFS");
228 inputs.emplace_back(
"trkData",
"GLO",
"TRKDATA");
230 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(
true,
239 "residual-aggregator",
242 AlgorithmSpec{adaptFromTask<o2::calibration::ResidualAggregatorDevice>(ccdbRequest, trackInput, ctpInput, writeUnbinnedResiduals, writeBinnedResiduals, writeTrackData, dataRequest)},
244 {
"sec-per-slot",
VariantType::UInt32, 600u, {
"number of seconds per calibration time slot (put 0 for infinite slot length)"}},
245 {
"updateInterval",
VariantType::UInt32, 6'000u, {
"update interval in number of TFs (only used in case slot length is infinite)"}},
247 {
"min-entries",
VariantType::Int, 0, {
"minimum number of entries on average per voxel"}},
248 {
"compression",
VariantType::Int, 505, {
"ROOT compression setting for output file (see TFile documentation for meaning of this number)"}},
249 {
"output-dir",
VariantType::String,
"none", {
"Output directory for residuals. Defaults to current working directory. Output is disabled in case set to /dev/null"}},
250 {
"meta-output-dir",
VariantType::String,
"/dev/null", {
"Residuals metadata output directory, must exist (if not /dev/null)"}},
251 {
"autosave-interval",
VariantType::Int, 0, {
"Write output to file for every n-th TF. 0 means this feature is OFF"}}}};
Utils and constants for calibration and related workflows.
Helper for geometry and GRP related CCDB requests.
Collects local TPC cluster residuals from EPNs.
Definition of the TrackResiduals class.
Helper class to extract VDrift from different sources.
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
ResidualAggregatorDevice(std::shared_ptr< o2::base::GRPGeomRequest > req, bool trackInput, bool ctpInput, bool writeUnbinnedResiduals, bool writeBinnedResiduals, bool writeTrackData, std::shared_ptr< o2::globaltracking::DataRequest > dataRequest)
ServiceRegistryRef services()
The services registry associated with this processing context.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
static std::string_view getSourceName(Source s)
constexpr TFType INFINITE_TF
Defining PrimaryVertex explicitly as messageable.
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
const o2::ctp::LumiInfo & getCTPLumi() const
float refTimeOffset
additive time offset reference (\mus)
float refVDrift
reference vdrift for which factor was extracted
float timeOffsetCorr
additive time offset correction (\mus)
float corrFact
drift velocity correction factor (multiplicative)
static std::string rectifyDirectory(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"