Project
Loading...
Searching...
No Matches
O2DPLDisplay.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
27#include "TOFBase/Geo.h"
28#include "TPCFastTransform.h"
29#include "TRDBase/Geometry.h"
40#include <unistd.h>
41
42using std::chrono::duration_cast;
43using std::chrono::milliseconds;
44using std::chrono::system_clock;
45
46using namespace o2::event_visualisation;
47using namespace o2::framework;
48using namespace o2::dataformats;
49using namespace o2::globaltracking;
50using namespace o2::tpc;
51using namespace o2::trd;
52
53// ------------------------------------------------------------------
54void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
55{
57}
58
59void customize(std::vector<ConfigParamSpec>& workflowOptions)
60{
61 std::vector<o2::framework::ConfigParamSpec> options{
62 {"jsons-folder", VariantType::String, "jsons", {"name of the folder to store json files"}},
63 {"use-json-format", VariantType::Bool, false, {"instead of eve format (default) use json format"}},
64 {"use-root-format", VariantType::Bool, false, {"instead of eve format (default) use root format"}},
65 {"eve-hostname", VariantType::String, "", {"name of the host allowed to produce files (empty means no limit)"}},
66 {"eve-dds-collection-index", VariantType::Int, -1, {"number of dpl collection allowed to produce files (-1 means no limit)"}},
67 {"time-interval", VariantType::Int, 5000, {"time interval in milliseconds between stored files"}},
68 {"disable-mc", VariantType::Bool, false, {"disable visualization of MC data"}},
69 {"disable-write", VariantType::Bool, false, {"disable writing output files"}},
70 {"display-clusters", VariantType::String, "ITS,TPC,TRD,TOF", {"comma-separated list of clusters to display"}},
71 {"display-tracks", VariantType::String, "TPC,ITS,ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF", {"comma-separated list of tracks to display"}},
72 {"disable-root-input", VariantType::Bool, false, {"disable root-files input reader"}},
73 {"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings, e.g. EveConfParam content..."}},
74 {"skipOnEmptyInput", VariantType::Bool, false, {"don't run the ED when no input is provided"}},
75 };
76
78 std::swap(workflowOptions, options);
79}
80
81#include "Framework/runDataProcessing.h" // main method must be included here (otherwise customize not used)
83{
84 LOGF(info, "------------------------ O2DPLDisplay::init version ", o2_eve_version, " ------------------------------------");
85 mData.mConfig.configProcessing.runMC = mUseMC;
87 if (mEMCALCalibLoader) {
88 mEMCALCalibrator = std::make_unique<o2::emcal::CellRecalibrator>();
89 }
90}
91
93{
94 const auto& conf = EveConfParam::Instance();
95
96 if (!mEveHostNameMatch) {
97 return;
98 }
99 if (conf.onlyNthEvent > 1 && mEventCounter++ % conf.onlyNthEvent) {
100 return;
101 }
102 LOGF(info, "------------------------ O2DPLDisplay::run version ", o2_eve_version, " ------------------------------------");
103 // filtering out any run which occur before reaching next time interval
104 auto currentTime = std::chrono::high_resolution_clock::now();
105 std::chrono::duration<double> elapsed = currentTime - this->mTimeStamp;
106 if (elapsed < this->mTimeInterval) {
107 return; // skip this run - it is too often
108 }
109 this->mTimeStamp = currentTime; // next run AFTER period counted from last run, even if there will be not any save
111 recoCont.collectData(pc, *mDataRequest);
112 updateTimeDependentParams(pc); // Make sure that this is called after the RecoContainer collect data, since some condition objects are fetched there
113 if (mEMCALCalibLoader) {
114 mEMCALCalibLoader->checkUpdates(pc);
115 if (mEMCALCalibLoader->hasUpdateBadChannelMap()) {
116 mEMCALCalibrator->setBadChannelMap(mEMCALCalibLoader->getBadChannelMap());
117 }
118 if (mEMCALCalibLoader->hasUpdateTimeCalib()) {
119 mEMCALCalibrator->setTimeCalibration(mEMCALCalibLoader->getTimeCalibration());
120 }
121 if (mEMCALCalibLoader->hasUpdateGainCalib()) {
122 mEMCALCalibrator->setGainCalibration(mEMCALCalibLoader->getGainCalibration());
123 }
124 }
125
126 EveWorkflowHelper helper;
127 helper.setTPCVDrift(&mTPCVDriftHelper.getVDriftObject());
128 helper.setRecoContainer(&recoCont);
129 if (mEMCALCalibrator) {
130 helper.setEMCALCellRecalibrator(mEMCALCalibrator.get());
131 }
132
133 helper.setITSROFs();
134 helper.selectTracks(&(mData.mConfig.configCalib), mClMask, mTrkMask, mTrkMask);
135 helper.selectTowers();
136 helper.prepareITSClusters(mData.mITSDict);
137 helper.prepareMFTClusters(mData.mMFTDict);
138
139 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
140
141 std::size_t filesSaved = 0;
142 const std::vector<std::string> dirs = o2::event_visualisation::DirectoryLoader::allFolders(this->mJsonPath);
143 const std::string marker = "_";
144 const std::vector<std::string> exts = {".json", ".root", ".eve"};
145 auto processData = [&](const auto& dataMap) {
146 for (const auto& keyVal : dataMap) {
147 if (conf.maxPVs > 0 && filesSaved >= conf.maxPVs) {
148 break;
149 }
150 if (conf.maxBytes > 0) {
151 auto periodStart =
152 duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - this->mTimeInterval.count();
153 if (!DirectoryLoader::canCreateNextFile(dirs, marker, exts, periodStart, conf.maxBytes)) {
154 LOGF(info, "Already too much data (> %d) to transfer in this period - event will not be not saved ...", conf.maxBytes);
155 break;
156 }
157 }
158 const auto pv = keyVal.first;
159 bool save = false;
160 if (conf.PVMode) {
161 helper.draw(pv, conf.trackSorting);
162 save = true;
163 } else {
164 helper.draw(pv, conf.trackSorting);
165 save = true;
166 }
167
168 if (conf.minITSTracks > -1 && helper.mEvent.getDetectorTrackCount(detectors::DetID::ITS) < conf.minITSTracks) {
169 save = false;
170 }
171
172 if (conf.minTracks > -1 && helper.mEvent.getTrackCount() < conf.minTracks) {
173 save = false;
174 }
175
176 if (this->mDisableWrite) {
177 save = false;
178 }
179
180 if (save) {
181 helper.mEvent.setClMask(this->mClMask.to_ulong());
182 helper.mEvent.setTrkMask(this->mTrkMask.to_ulong());
183 helper.mEvent.setRunNumber(tinfo.runNumber);
184 helper.mEvent.setTfCounter(tinfo.tfCounter);
185 helper.mEvent.setFirstTForbit(tinfo.firstTForbit);
186 helper.mEvent.setRunType(this->mRunType);
187 helper.mEvent.setPrimaryVertex(pv);
188 helper.mEvent.setCreationTime(tinfo.creation);
189 helper.save(this->mJsonPath, this->mExt, conf.maxFiles);
190 filesSaved++;
191 currentTime = std::chrono::high_resolution_clock::now(); // time AFTER save
192 this->mTimeStamp = currentTime; // next run AFTER period counted from last save
193 }
194
195 helper.clear();
196 }
197 };
198 if (conf.PVTriggersMode) {
199 processData(helper.mPrimaryVertexTriggerGIDs);
200 } else {
201 processData(helper.mPrimaryVertexTrackGIDs);
202 }
203
204 auto endTime = std::chrono::high_resolution_clock::now();
205 LOGP(info, "Visualization of TF:{} at orbit {} took {} s.", tinfo.tfCounter, tinfo.firstTForbit, std::chrono::duration_cast<std::chrono::microseconds>(endTime - currentTime).count() * 1e-6);
206
207 LOGP(info, "PVs with tracks: {}", helper.mPrimaryVertexTrackGIDs.size());
208 LOGP(info, "PVs with triggers: {}", helper.mPrimaryVertexTriggerGIDs.size());
209 LOGP(info, "Data files saved: {}", filesSaved);
210
211 std::unordered_map<o2::dataformats::GlobalTrackID, std::size_t> savedDataTypes;
212
213 for (int i = 0; i < GID::Source::NSources; i++) {
214 savedDataTypes[i] = 0;
215 }
216
217 for (const auto& gid : helper.mTotalAcceptedDataTypes) {
218 savedDataTypes[gid.getSource()] += 1;
219 }
220
221 std::vector<std::string> sourceStats;
222 sourceStats.reserve(GID::Source::NSources);
223
224 const auto combinedMask = mTrkMask | mClMask;
225
226 for (int i = 0; i < GID::Source::NSources; i++) {
227 if (combinedMask[i]) {
228 sourceStats.emplace_back(fmt::format("{}/{} {}", savedDataTypes.at(i), helper.mTotalDataTypes.at(i), GID::getSourceName(i)));
229 }
230 }
231
232 // FIXME: find out why this does not work with 11.1.1
233// LOGP(info, "Tracks: {}", fmt::join(sourceStats, ", "));
234}
235
239
240void O2DPLDisplaySpec::updateTimeDependentParams(ProcessingContext& pc)
241{
243 static bool initOnceDone = false;
244 mTPCVDriftHelper.extractCCDBInputs(pc);
245 if (mTPCVDriftHelper.isUpdated()) {
246 mTPCVDriftHelper.acknowledgeUpdate();
247 }
248 if (!initOnceDone) { // this params need to be queried only once
249 initOnceDone = true;
250 auto grpECS = o2::base::GRPGeomHelper::instance().getGRPECS(); // RS
251 mRunType = grpECS->getRunType();
252 mData.init();
253 }
254 // pc.inputs().get<o2::itsmft::TopologyDictionary*>("cldictITS"); // called by the RecoContainer
255 // pc.inputs().get<o2::itsmft::TopologyDictionary*>("cldictMFT"); // called by the RecoContainer
256}
257
259{
261 return;
262 }
263 if (mEMCALCalibLoader && mEMCALCalibLoader->finalizeCCDB(matcher, obj)) {
264 return;
265 }
266 if (matcher == ConcreteDataMatcher("ITS", "CLUSDICT", 0)) {
267 LOGF(info, "ITS cluster dictionary updated");
269 return;
270 }
271 if (matcher == ConcreteDataMatcher("MFT", "CLUSDICT", 0)) {
272 LOGF(info, "MFT cluster dictionary updated");
274 return;
275 }
276 if (mTPCVDriftHelper.accountCCDBInputs(matcher, obj)) {
277
278 return;
279 }
280}
281
283{
284 LOGF(info, "------------------------ defineDataProcessing ", o2_eve_version, " ------------------------------------");
285
286 WorkflowSpec specs;
287
288 auto jsonFolder = cfgc.options().get<std::string>("jsons-folder");
289 std::string ext = ".eve"; // root files are default format
290 auto useJsonFormat = cfgc.options().get<bool>("use-json-format");
291 if (useJsonFormat) {
292 ext = ".json";
293 }
294 auto useROOTFormat = cfgc.options().get<bool>("use-root-format");
295 if (useROOTFormat) {
296 ext = ".root";
297 }
298 auto eveHostName = cfgc.options().get<std::string>("eve-hostname");
299 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
300 const auto& conf = EveConfParam::Instance();
301
302 bool useMC = !cfgc.options().get<bool>("disable-mc");
303 bool disableWrite = cfgc.options().get<bool>("disable-write");
304
305 char hostname[_POSIX_HOST_NAME_MAX];
306 gethostname(hostname, _POSIX_HOST_NAME_MAX);
307 bool eveHostNameMatch = eveHostName.empty() || eveHostName == hostname;
308
309 int eveDDSColIdx = cfgc.options().get<int>("eve-dds-collection-index");
310 if (eveDDSColIdx != -1) {
311 char* colIdx = getenv("DDS_COLLECTION_INDEX");
312 int myIdx = colIdx ? atoi(colIdx) : -1;
313 if (myIdx == eveDDSColIdx) {
314 LOGF(important, "Restricting DPL Display to collection index, my index ", myIdx, ", enabled ", int(myIdx == eveDDSColIdx));
315 } else {
316 LOGF(info, "Restricting DPL Display to collection index, my index ", myIdx, ", enabled ", int(myIdx == eveDDSColIdx));
317 }
318 eveHostNameMatch &= myIdx == eveDDSColIdx;
319 }
320
321 std::chrono::milliseconds timeInterval(cfgc.options().get<int>("time-interval"));
322
323 GlobalTrackID::mask_t srcTrk = GlobalTrackID::getSourcesMask(cfgc.options().get<std::string>("display-tracks"));
324 GlobalTrackID::mask_t srcCl = GlobalTrackID::getSourcesMask(cfgc.options().get<std::string>("display-clusters"));
325
326 if (srcTrk[GID::MFTMCH] && srcTrk[GID::MCHMID]) {
327 srcTrk |= GID::getSourceMask(GID::MFTMCHMID);
328 }
329
332
333 srcTrk &= allowedTracks;
334 srcCl &= allowedClusters;
335
336 if (!srcTrk.any() && !srcCl.any()) {
337 if (cfgc.options().get<bool>("skipOnEmptyInput")) {
338 LOGF(info, "No valid inputs for event display, disabling event display");
339 return std::move(specs);
340 }
341 throw std::runtime_error("No input configured");
342 }
343
344 auto isRangeEnabled = [&opts = cfgc.options()](const char* min_name, const char* max_name) {
345 EveWorkflowHelper::Bracket bracket{opts.get<float>(min_name), opts.get<float>(max_name)};
346 bool optEnabled = false;
347
348 if (bracket.getMin() < 0 && bracket.getMax() < 0) {
349 optEnabled = false;
350 } else if (bracket.getMin() >= 0 && bracket.getMax() >= 0) {
351 optEnabled = true;
352
353 if (bracket.isInvalid()) {
354 throw std::runtime_error(fmt::format("{}, {} bracket is invalid", min_name, max_name));
355 }
356 } else {
357 throw std::runtime_error(fmt::format("Both boundaries, {} and {}, have to be specified at the same time", min_name, max_name));
358 }
359
360 return std::make_tuple(optEnabled, bracket);
361 };
362
363 std::shared_ptr<DataRequest> dataRequest = std::make_shared<DataRequest>();
364 dataRequest->requestTracks(srcTrk, useMC);
365 dataRequest->requestClusters(srcCl, useMC);
366
367 if (conf.filterITSROF) {
368 dataRequest->requestIRFramesITS();
370 }
371
372 InputHelper::addInputSpecs(cfgc, specs, srcCl, srcTrk, srcTrk, useMC);
373 if (conf.PVMode) {
374 dataRequest->requestPrimaryVertices(useMC);
375 InputHelper::addInputSpecsPVertex(cfgc, specs, useMC);
376 }
377 o2::tpc::VDriftHelper::requestCCDBInputs(dataRequest->inputs);
378
379 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
380 true, // GRPECS=true
381 false, // GRPLHCIF
382 true, // GRPMagField
383 true, // askMatLUT
385 dataRequest->inputs,
386 true); // query only once all objects except mag.field
387
388 std::shared_ptr<o2::emcal::CalibLoader> emcalCalibLoader;
389 if (conf.calibrateEMC) {
390 emcalCalibLoader = std::make_shared<o2::emcal::CalibLoader>();
391 emcalCalibLoader->enableTimeCalib(true);
392 emcalCalibLoader->enableBadChannelMap(true);
393 emcalCalibLoader->enableGainCalib(true);
394 emcalCalibLoader->defineInputSpecs(dataRequest->inputs);
395 }
396
397 specs.emplace_back(DataProcessorSpec{
398 "o2-eve-export",
399 dataRequest->inputs,
400 {},
401 AlgorithmSpec{adaptFromTask<O2DPLDisplaySpec>(disableWrite, useMC, srcTrk, srcCl, dataRequest, ggRequest, emcalCalibLoader, jsonFolder, ext, timeInterval, eveHostNameMatch)}});
402
403 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
404 o2::raw::HBFUtilsInitializer hbfIni(cfgc, specs);
405
406 return std::move(specs);
407}
Wrapper container for different reconstructed object types.
Definition of the MCH cluster minimal structure.
Loading content of the Folder and returning sorted.
int32_t i
Global index for barrel track: provides provenance (detectors combination), index in respective array...
Definition of the MCH ROFrame record.
Definition of the Names Generator class.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
Definition of TPCFastTransform class.
Definition of the MCH track.
Helper class to obtain TPC clusters / digits / labels from DPL.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static void updateFromString(std::string const &)
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr ID ITS
Definition DetID.h:63
const o2::itsmft::TopologyDictionary * mMFTDict
const o2::itsmft::TopologyDictionary * mITSDict
void setITSDict(const o2::itsmft::TopologyDictionary *d)
void setMFTDict(const o2::itsmft::TopologyDictionary *d)
static bool canCreateNextFile(const std::vector< std::string > &paths, const std::string &marker, const std::vector< std::string > &ext, long long millisec, long capacityAllowed)
static std::vector< std::string > allFolders(const std::string &location)
std::unordered_map< std::size_t, std::vector< GID > > mPrimaryVertexTriggerGIDs
void save(const std::string &jsonPath, const std::string &ext, int numberOfFiles)
o2::event_visualisation::VisualisationEvent mEvent
void setRecoContainer(const o2::globaltracking::RecoContainer *rc)
void setTPCVDrift(const o2::tpc::VDriftCorrFact *v)
void draw(std::size_t primaryVertexIdx, bool sortTracks)
void selectTracks(const CalibObjectsConst *calib, GID::mask_t maskCl, GID::mask_t maskTrk, GID::mask_t maskMatch)
void setEMCALCellRecalibrator(o2::emcal::CellRecalibrator *calibrator)
void prepareITSClusters(const o2::itsmft::TopologyDictionary *dict)
void prepareMFTClusters(const o2::itsmft::TopologyDictionary *dict)
std::unordered_map< std::size_t, std::vector< GID > > mPrimaryVertexTrackGIDs
std::unordered_set< GID > mTotalAcceptedDataTypes
std::unordered_map< GID, std::size_t > mTotalDataTypes
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
size_t getDetectorTrackCount(o2::detectors::DetID::ID id) const
void setFirstTForbit(o2::header::DataHeader::TForbitType value)
void setRunNumber(o2::header::DataHeader::RunNumberType runNumber)
void setCreationTime(o2::framework::DataProcessingHeader::CreationTime creationTime)
void setRunType(o2::parameters::GRPECS::RunType runType)
void setTfCounter(o2::header::DataHeader::TFCounterType value)
ConfigParamRegistry & options() const
ServiceRegistryRef services()
The services registry associated with this processing context.
static int addInputSpecs(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs, GID::mask_t maskClusters, GID::mask_t maskMatches, GID::mask_t maskTracks, bool useMC=true, GID::mask_t maskClustersMC=GID::getSourcesMask(GID::ALL), GID::mask_t maskTracksMC=GID::getSourcesMask(GID::ALL), bool subSpecStrict=false)
static int addInputSpecsIRFramesITS(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs)
static int addInputSpecsPVertex(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs, bool mc)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
bool isUpdated() const
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
Definition SimTraits.h:167
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))