Project
Loading...
Searching...
No Matches
O2DPLDisplay.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
27#include "TOFBase/Geo.h"
28#include "TPCFastTransform.h"
29#include "TRDBase/Geometry.h"
40#include <unistd.h>
41
42using std::chrono::duration_cast;
43using std::chrono::milliseconds;
44using std::chrono::system_clock;
45
46using namespace o2::event_visualisation;
47using namespace o2::framework;
48using namespace o2::dataformats;
49using namespace o2::globaltracking;
50using namespace o2::tpc;
51using namespace o2::trd;
52
53// ------------------------------------------------------------------
54void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
55{
57}
58
59void customize(std::vector<ConfigParamSpec>& workflowOptions)
60{
61 std::vector<o2::framework::ConfigParamSpec> options{
62 {"jsons-folder", VariantType::String, "jsons", {"name of the folder to store json files"}},
63 {"receiver-hostname", VariantType::String, "arcbs04.cern.ch", {"name of the host where visualisation data is transmitted (only eve format)"}},
64 {"receiver-port", VariantType::Int, 8001, {"port number of the host where visualisation data is transmitted (only eve format)"}},
65 {"receiver-timeout", VariantType::Int, 300, {"socket connection timeout (ms)"}},
66 {"use-only-files", VariantType::Bool, false, {"do not transmit visualisation data using sockets (only eve format)"}},
67 {"use-only-sockets", VariantType::Bool, false, {"do not store visualisation data using filesystem"}},
68 {"use-json-format", VariantType::Bool, false, {"instead of eve format (default) use json format"}},
69 {"use-root-format", VariantType::Bool, false, {"instead of eve format (default) use root format"}},
70 {"eve-hostname", VariantType::String, "", {"name of the host allowed to produce files (empty means no limit)"}},
71 {"eve-dds-collection-index", VariantType::Int, -1, {"number of dpl collection allowed to produce files (-1 means no limit)"}},
72 {"time-interval", VariantType::Int, 5000, {"time interval in milliseconds between stored files"}},
73 {"disable-mc", VariantType::Bool, false, {"disable visualization of MC data"}},
74 {"disable-write", VariantType::Bool, false, {"disable writing output files"}},
75 {"display-clusters", VariantType::String, "ITS,TPC,TRD,TOF", {"comma-separated list of clusters to display"}},
76 {"display-tracks", VariantType::String, "TPC,ITS,ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF", {"comma-separated list of tracks to display"}},
77 {"disable-root-input", VariantType::Bool, false, {"disable root-files input reader"}},
78 {"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings, e.g. EveConfParam content..."}},
79 {"skipOnEmptyInput", VariantType::Bool, false, {"don't run the ED when no input is provided"}},
80 };
81
83 std::swap(workflowOptions, options);
84}
85
86#include "Framework/runDataProcessing.h" // main method must be included here (otherwise customize not used)
88{
89 LOGF(info, "------------------------ O2DPLDisplay::init version ", o2_eve_version, " ------------------------------------");
90 mData.mConfig.configProcessing.runMC = mUseMC;
92 if (mEMCALCalibLoader) {
93 mEMCALCalibrator = std::make_unique<o2::emcal::CellRecalibrator>();
94 }
95}
96
98{
99 const auto& conf = EveConfParam::Instance();
100
101 if (!mEveHostNameMatch) {
102 return;
103 }
104 if (conf.onlyNthEvent > 1 && mEventCounter++ % conf.onlyNthEvent) {
105 return;
106 }
107 LOGF(info, "------------------------ O2DPLDisplay::run version ", o2_eve_version, " ------------------------------------");
108 // filtering out any run which occur before reaching next time interval
109 auto currentTime = std::chrono::high_resolution_clock::now();
110 std::chrono::duration<double> elapsed = currentTime - this->mTimeStamp;
111 if (elapsed < this->mTimeInterval) {
112 return; // skip this run - it is too often
113 }
114 this->mTimeStamp = currentTime; // next run AFTER period counted from last run, even if there will be not any save
116 recoCont.collectData(pc, *mDataRequest);
117 updateTimeDependentParams(pc); // Make sure that this is called after the RecoContainer collect data, since some condition objects are fetched there
118 if (mEMCALCalibLoader) {
119 mEMCALCalibLoader->checkUpdates(pc);
120 if (mEMCALCalibLoader->hasUpdateBadChannelMap()) {
121 mEMCALCalibrator->setBadChannelMap(mEMCALCalibLoader->getBadChannelMap());
122 }
123 if (mEMCALCalibLoader->hasUpdateTimeCalib()) {
124 mEMCALCalibrator->setTimeCalibration(mEMCALCalibLoader->getTimeCalibration());
125 }
126 if (mEMCALCalibLoader->hasUpdateGainCalib()) {
127 mEMCALCalibrator->setGainCalibration(mEMCALCalibLoader->getGainCalibration());
128 }
129 }
130
131 EveWorkflowHelper helper;
132 helper.setTPCVDrift(&mTPCVDriftHelper.getVDriftObject());
133 helper.setRecoContainer(&recoCont);
134 if (mEMCALCalibrator) {
135 helper.setEMCALCellRecalibrator(mEMCALCalibrator.get());
136 }
137
138 helper.setITSROFs();
139 helper.selectTracks(&(mData.mConfig.configCalib), mClMask, mTrkMask, mTrkMask);
140 helper.selectTowers();
141 helper.prepareITSClusters(mData.mITSDict);
142 helper.prepareMFTClusters(mData.mMFTDict);
143
144 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
145
146 std::size_t filesSaved = 0;
147 const std::vector<std::string> dirs = o2::event_visualisation::DirectoryLoader::allFolders(this->mJsonPath);
148 const std::string marker = "_";
149 const std::vector<std::string> exts = {".json", ".root", ".eve"};
150 auto processData = [&](const auto& dataMap) {
151 for (const auto& keyVal : dataMap) {
152 if (conf.maxPVs > 0 && filesSaved >= conf.maxPVs) {
153 break;
154 }
155 if (conf.maxBytes > 0) {
156 auto periodStart =
157 duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - this->mTimeInterval.count();
158 if (!DirectoryLoader::canCreateNextFile(dirs, marker, exts, periodStart, conf.maxBytes)) {
159 LOGF(info, "Already too much data (> %d) to transfer in this period - event will not be not saved ...", conf.maxBytes);
160 break;
161 }
162 }
163 const auto pv = keyVal.first;
164 bool save = false;
165 if (conf.PVMode) {
166 helper.draw(pv, conf.trackSorting);
167 save = true;
168 } else {
169 helper.draw(pv, conf.trackSorting);
170 save = true;
171 }
172
173 if (conf.minITSTracks > -1 && helper.mEvent.getDetectorTrackCount(detectors::DetID::ITS) < conf.minITSTracks) {
174 save = false;
175 }
176
177 if (conf.minTracks > -1 && helper.mEvent.getTrackCount() < conf.minTracks) {
178 save = false;
179 }
180
181 if (this->mDisableWrite) {
182 save = false;
183 }
184
185 if (save) {
186 helper.mEvent.setClMask(this->mClMask.to_ulong());
187 helper.mEvent.setTrkMask(this->mTrkMask.to_ulong());
188 helper.mEvent.setRunNumber(tinfo.runNumber);
189 helper.mEvent.setTfCounter(tinfo.tfCounter);
190 helper.mEvent.setFirstTForbit(tinfo.firstTForbit);
191 helper.mEvent.setRunType(this->mRunType);
192 helper.mEvent.setPrimaryVertex(pv);
193 helper.mEvent.setCreationTime(tinfo.creation);
194 helper.save(this->mJsonPath, this->mExt, conf.maxFiles, this->mReceiverHostname, this->mReceiverPort, this->mReceiverTimeout, this->mUseOnlyFiles, this->mUseOnlySockets);
195 filesSaved++;
196 currentTime = std::chrono::high_resolution_clock::now(); // time AFTER save
197 this->mTimeStamp = currentTime; // next run AFTER period counted from last save
198 }
199
200 helper.clear();
201 }
202 };
203 if (conf.PVTriggersMode) {
204 processData(helper.mPrimaryVertexTriggerGIDs);
205 } else {
206 processData(helper.mPrimaryVertexTrackGIDs);
207 }
208
209 auto endTime = std::chrono::high_resolution_clock::now();
210 LOGP(info, "Visualization of TF:{} at orbit {} took {} s.", tinfo.tfCounter, tinfo.firstTForbit, std::chrono::duration_cast<std::chrono::microseconds>(endTime - currentTime).count() * 1e-6);
211
212 LOGP(info, "PVs with tracks: {}", helper.mPrimaryVertexTrackGIDs.size());
213 LOGP(info, "PVs with triggers: {}", helper.mPrimaryVertexTriggerGIDs.size());
214 LOGP(info, "Data files saved: {}", filesSaved);
215
216 std::unordered_map<o2::dataformats::GlobalTrackID, std::size_t> savedDataTypes;
217
218 for (int i = 0; i < GID::Source::NSources; i++) {
219 savedDataTypes[i] = 0;
220 }
221
222 for (const auto& gid : helper.mTotalAcceptedDataTypes) {
223 savedDataTypes[gid.getSource()] += 1;
224 }
225
226 std::vector<std::string> sourceStats;
227 sourceStats.reserve(GID::Source::NSources);
228
229 const auto combinedMask = mTrkMask | mClMask;
230
231 for (int i = 0; i < GID::Source::NSources; i++) {
232 if (combinedMask[i]) {
233 sourceStats.emplace_back(fmt::format("{}/{} {}", savedDataTypes.at(i), helper.mTotalDataTypes.at(i), GID::getSourceName(i)));
234 }
235 }
236
237 // FIXME: find out why this does not work with 11.1.1
238// LOGP(info, "Tracks: {}", fmt::join(sourceStats, ", "));
239}
240
244
245void O2DPLDisplaySpec::updateTimeDependentParams(ProcessingContext& pc)
246{
248 static bool initOnceDone = false;
249 mTPCVDriftHelper.extractCCDBInputs(pc);
250 if (mTPCVDriftHelper.isUpdated()) {
251 mTPCVDriftHelper.acknowledgeUpdate();
252 }
253 if (!initOnceDone) { // this params need to be queried only once
254 initOnceDone = true;
255 auto grpECS = o2::base::GRPGeomHelper::instance().getGRPECS(); // RS
256 mRunType = grpECS->getRunType();
257 mData.init();
258 }
259 // pc.inputs().get<o2::itsmft::TopologyDictionary*>("cldictITS"); // called by the RecoContainer
260 // pc.inputs().get<o2::itsmft::TopologyDictionary*>("cldictMFT"); // called by the RecoContainer
261}
262
264{
266 return;
267 }
268 if (mEMCALCalibLoader && mEMCALCalibLoader->finalizeCCDB(matcher, obj)) {
269 return;
270 }
271 if (matcher == ConcreteDataMatcher("ITS", "CLUSDICT", 0)) {
272 LOGF(info, "ITS cluster dictionary updated");
274 return;
275 }
276 if (matcher == ConcreteDataMatcher("MFT", "CLUSDICT", 0)) {
277 LOGF(info, "MFT cluster dictionary updated");
279 return;
280 }
281 if (mTPCVDriftHelper.accountCCDBInputs(matcher, obj)) {
282
283 return;
284 }
285}
286
288{
289 LOGF(info, "------------------------ defineDataProcessing ", o2_eve_version, " ------------------------------------");
290
291 WorkflowSpec specs;
292
293 auto jsonFolder = cfgc.options().get<std::string>("jsons-folder");
294 std::string ext = ".eve"; // root files are default format
295 auto useJsonFormat = cfgc.options().get<bool>("use-json-format");
296 if (useJsonFormat) {
297 ext = ".json";
298 }
299 auto useROOTFormat = cfgc.options().get<bool>("use-root-format");
300 if (useROOTFormat) {
301 ext = ".root";
302 }
303 auto eveHostName = cfgc.options().get<std::string>("eve-hostname");
304 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
305 const auto& conf = EveConfParam::Instance();
306
307 bool useMC = !cfgc.options().get<bool>("disable-mc");
308 bool disableWrite = cfgc.options().get<bool>("disable-write");
309
310 auto receiverHostname = cfgc.options().get<std::string>("receiver-hostname");
311 auto receiverPort = cfgc.options().get<int>("receiver-port");
312 auto receiverTimeout = cfgc.options().get<int>("receiver-timeout");
313 auto useOnlyFiles = cfgc.options().get<bool>("use-only-files");
314 auto useOnlySockets = cfgc.options().get<bool>("use-only-sockets");
315
316 char hostname[_POSIX_HOST_NAME_MAX];
317 gethostname(hostname, _POSIX_HOST_NAME_MAX);
318 bool eveHostNameMatch = eveHostName.empty() || eveHostName == hostname;
319
320 int eveDDSColIdx = cfgc.options().get<int>("eve-dds-collection-index");
321 if (eveDDSColIdx != -1) {
322 char* colIdx = getenv("DDS_COLLECTION_INDEX");
323 int myIdx = colIdx ? atoi(colIdx) : -1;
324 if (myIdx == eveDDSColIdx) {
325 LOGF(important, "Restricting DPL Display to collection index, my index ", myIdx, ", enabled ", int(myIdx == eveDDSColIdx));
326 } else {
327 LOGF(info, "Restricting DPL Display to collection index, my index ", myIdx, ", enabled ", int(myIdx == eveDDSColIdx));
328 }
329 eveHostNameMatch &= myIdx == eveDDSColIdx;
330 }
331
332 std::chrono::milliseconds timeInterval(cfgc.options().get<int>("time-interval"));
333
334 GlobalTrackID::mask_t srcTrk = GlobalTrackID::getSourcesMask(cfgc.options().get<std::string>("display-tracks"));
335 GlobalTrackID::mask_t srcCl = GlobalTrackID::getSourcesMask(cfgc.options().get<std::string>("display-clusters"));
336
337 if (srcTrk[GID::MFTMCH] && srcTrk[GID::MCHMID]) {
338 srcTrk |= GID::getSourceMask(GID::MFTMCHMID);
339 }
340
343
344 srcTrk &= allowedTracks;
345 srcCl &= allowedClusters;
346
347 if (!srcTrk.any() && !srcCl.any()) {
348 if (cfgc.options().get<bool>("skipOnEmptyInput")) {
349 LOGF(info, "No valid inputs for event display, disabling event display");
350 return std::move(specs);
351 }
352 throw std::runtime_error("No input configured");
353 }
354
355 auto isRangeEnabled = [&opts = cfgc.options()](const char* min_name, const char* max_name) {
356 EveWorkflowHelper::Bracket bracket{opts.get<float>(min_name), opts.get<float>(max_name)};
357 bool optEnabled = false;
358
359 if (bracket.getMin() < 0 && bracket.getMax() < 0) {
360 optEnabled = false;
361 } else if (bracket.getMin() >= 0 && bracket.getMax() >= 0) {
362 optEnabled = true;
363
364 if (bracket.isInvalid()) {
365 throw std::runtime_error(fmt::format("{}, {} bracket is invalid", min_name, max_name));
366 }
367 } else {
368 throw std::runtime_error(fmt::format("Both boundaries, {} and {}, have to be specified at the same time", min_name, max_name));
369 }
370
371 return std::make_tuple(optEnabled, bracket);
372 };
373
374 std::shared_ptr<DataRequest> dataRequest = std::make_shared<DataRequest>();
375 dataRequest->requestTracks(srcTrk, useMC);
376 dataRequest->requestClusters(srcCl, useMC);
377
378 if (conf.filterITSROF) {
379 dataRequest->requestIRFramesITS();
381 }
382
383 InputHelper::addInputSpecs(cfgc, specs, srcCl, srcTrk, srcTrk, useMC);
384 if (conf.PVMode) {
385 dataRequest->requestPrimaryVertices(useMC);
386 InputHelper::addInputSpecsPVertex(cfgc, specs, useMC);
387 }
388 o2::tpc::VDriftHelper::requestCCDBInputs(dataRequest->inputs);
389
390 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
391 true, // GRPECS=true
392 false, // GRPLHCIF
393 true, // GRPMagField
394 true, // askMatLUT
396 dataRequest->inputs,
397 true); // query only once all objects except mag.field
398
399 std::shared_ptr<o2::emcal::CalibLoader> emcalCalibLoader;
400 if (conf.calibrateEMC) {
401 emcalCalibLoader = std::make_shared<o2::emcal::CalibLoader>();
402 emcalCalibLoader->enableTimeCalib(true);
403 emcalCalibLoader->enableBadChannelMap(true);
404 emcalCalibLoader->enableGainCalib(true);
405 emcalCalibLoader->defineInputSpecs(dataRequest->inputs);
406 }
407
408 specs.emplace_back(DataProcessorSpec{
409 "o2-eve-export",
410 dataRequest->inputs,
411 {},
412 AlgorithmSpec{adaptFromTask<O2DPLDisplaySpec>(disableWrite, useMC, srcTrk, srcCl, dataRequest, ggRequest,
413 emcalCalibLoader, jsonFolder, ext, timeInterval, eveHostNameMatch,
414 receiverHostname, receiverPort, receiverTimeout, useOnlyFiles, useOnlySockets)}});
415
416 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
417 o2::raw::HBFUtilsInitializer hbfIni(cfgc, specs);
418
419 return std::move(specs);
420}
Wrapper container for different reconstructed object types.
Definition of the MCH cluster minimal structure.
Loading content of the Folder and returning sorted.
int32_t i
Global index for barrel track: provides provenance (detectors combination), index in respective array...
Definition of the MCH ROFrame record.
Definition of the Names Generator class.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
Definition of TPCFastTransform class.
Definition of the MCH track.
Helper class to obtain TPC clusters / digits / labels from DPL.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static void updateFromString(std::string const &)
static mask_t getSourcesMask(const std::string_view srcList)
static constexpr ID ITS
Definition DetID.h:63
const o2::itsmft::TopologyDictionary * mMFTDict
const o2::itsmft::TopologyDictionary * mITSDict
void setITSDict(const o2::itsmft::TopologyDictionary *d)
void setMFTDict(const o2::itsmft::TopologyDictionary *d)
static bool canCreateNextFile(const std::vector< std::string > &paths, const std::string &marker, const std::vector< std::string > &ext, long long millisec, long capacityAllowed)
static std::vector< std::string > allFolders(const std::string &location)
std::unordered_map< std::size_t, std::vector< GID > > mPrimaryVertexTriggerGIDs
void save(const std::string &jsonPath, const std::string &ext, int numberOfFiles, const std::string &receiverHostname, int receiverPort, int receiverTimeout, bool useOnlyFiles, bool useOnlySockets)
o2::event_visualisation::VisualisationEvent mEvent
void setRecoContainer(const o2::globaltracking::RecoContainer *rc)
void setTPCVDrift(const o2::tpc::VDriftCorrFact *v)
void draw(std::size_t primaryVertexIdx, bool sortTracks)
void selectTracks(const CalibObjectsConst *calib, GID::mask_t maskCl, GID::mask_t maskTrk, GID::mask_t maskMatch)
void setEMCALCellRecalibrator(o2::emcal::CellRecalibrator *calibrator)
void prepareITSClusters(const o2::itsmft::TopologyDictionary *dict)
void prepareMFTClusters(const o2::itsmft::TopologyDictionary *dict)
std::unordered_map< std::size_t, std::vector< GID > > mPrimaryVertexTrackGIDs
std::unordered_set< GID > mTotalAcceptedDataTypes
std::unordered_map< GID, std::size_t > mTotalDataTypes
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
size_t getDetectorTrackCount(o2::detectors::DetID::ID id) const
void setFirstTForbit(o2::header::DataHeader::TForbitType value)
void setRunNumber(o2::header::DataHeader::RunNumberType runNumber)
void setCreationTime(o2::framework::DataProcessingHeader::CreationTime creationTime)
void setRunType(o2::parameters::GRPECS::RunType runType)
void setTfCounter(o2::header::DataHeader::TFCounterType value)
ConfigParamRegistry & options() const
ServiceRegistryRef services()
The services registry associated with this processing context.
static int addInputSpecs(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs, GID::mask_t maskClusters, GID::mask_t maskMatches, GID::mask_t maskTracks, bool useMC=true, GID::mask_t maskClustersMC=GID::getSourcesMask(GID::ALL), GID::mask_t maskTracksMC=GID::getSourcesMask(GID::ALL), bool subSpecStrict=false)
static int addInputSpecsIRFramesITS(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs)
static int addInputSpecsPVertex(const o2::framework::ConfigContext &configcontext, o2::framework::WorkflowSpec &specs, bool mc)
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
bool isUpdated() const
Definition of a container to keep/associate and arbitrary number of labels associated to an index wit...
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
Definition SimTraits.h:167
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))