42using std::chrono::duration_cast;
43using std::chrono::milliseconds;
44using std::chrono::system_clock;
54void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
59void customize(std::vector<ConfigParamSpec>& workflowOptions)
61 std::vector<o2::framework::ConfigParamSpec> options{
62 {
"jsons-folder", VariantType::String,
"jsons", {
"name of the folder to store json files"}},
63 {
"receiver-hostname", VariantType::String,
"arcbs04.cern.ch", {
"name of the host where visualisation data is transmitted (only eve format)"}},
64 {
"receiver-port", VariantType::Int, 8001, {
"port number of the host where visualisation data is transmitted (only eve format)"}},
65 {
"receiver-timeout", VariantType::Int, 300, {
"socket connection timeout (ms)"}},
66 {
"use-only-files", VariantType::Bool,
false, {
"do not transmit visualisation data using sockets (only eve format)"}},
67 {
"use-only-sockets", VariantType::Bool,
false, {
"do not store visualisation data using filesystem"}},
68 {
"use-json-format", VariantType::Bool,
false, {
"instead of eve format (default) use json format"}},
69 {
"use-root-format", VariantType::Bool,
false, {
"instead of eve format (default) use root format"}},
70 {
"eve-hostname", VariantType::String,
"", {
"name of the host allowed to produce files (empty means no limit)"}},
71 {
"eve-dds-collection-index", VariantType::Int, -1, {
"number of dpl collection allowed to produce files (-1 means no limit)"}},
72 {
"time-interval", VariantType::Int, 5000, {
"time interval in milliseconds between stored files"}},
73 {
"disable-mc", VariantType::Bool,
false, {
"disable visualization of MC data"}},
74 {
"disable-write", VariantType::Bool,
false, {
"disable writing output files"}},
75 {
"display-clusters", VariantType::String,
"ITS,TPC,TRD,TOF", {
"comma-separated list of clusters to display"}},
76 {
"display-tracks", VariantType::String,
"TPC,ITS,ITS-TPC,TPC-TRD,ITS-TPC-TRD,TPC-TOF,ITS-TPC-TOF", {
"comma-separated list of tracks to display"}},
77 {
"disable-root-input", VariantType::Bool,
false, {
"disable root-files input reader"}},
78 {
"configKeyValues", VariantType::String,
"", {
"semicolon separated key=value strings, e.g. EveConfParam content..."}},
79 {
"skipOnEmptyInput", VariantType::Bool,
false, {
"don't run the ED when no input is provided"}},
83 std::swap(workflowOptions, options);
89 LOGF(info,
"------------------------ O2DPLDisplay::init version ", o2_eve_version,
" ------------------------------------");
92 if (mEMCALCalibLoader) {
93 mEMCALCalibrator = std::make_unique<o2::emcal::CellRecalibrator>();
101 if (!mEveHostNameMatch) {
104 if (conf.onlyNthEvent > 1 && mEventCounter++ % conf.onlyNthEvent) {
107 LOGF(info,
"------------------------ O2DPLDisplay::run version ", o2_eve_version,
" ------------------------------------");
109 auto currentTime = std::chrono::high_resolution_clock::now();
110 std::chrono::duration<double> elapsed = currentTime - this->mTimeStamp;
111 if (elapsed < this->mTimeInterval) {
114 this->mTimeStamp = currentTime;
117 updateTimeDependentParams(pc);
118 if (mEMCALCalibLoader) {
119 mEMCALCalibLoader->checkUpdates(pc);
120 if (mEMCALCalibLoader->hasUpdateBadChannelMap()) {
121 mEMCALCalibrator->setBadChannelMap(mEMCALCalibLoader->getBadChannelMap());
123 if (mEMCALCalibLoader->hasUpdateTimeCalib()) {
124 mEMCALCalibrator->setTimeCalibration(mEMCALCalibLoader->getTimeCalibration());
126 if (mEMCALCalibLoader->hasUpdateGainCalib()) {
127 mEMCALCalibrator->setGainCalibration(mEMCALCalibLoader->getGainCalibration());
134 if (mEMCALCalibrator) {
146 std::size_t filesSaved = 0;
148 const std::string
marker =
"_";
149 const std::vector<std::string> exts = {
".json",
".root",
".eve"};
150 auto processData = [&](
const auto& dataMap) {
151 for (
const auto& keyVal : dataMap) {
152 if (conf.maxPVs > 0 && filesSaved >= conf.maxPVs) {
155 if (conf.maxBytes > 0) {
157 duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - this->mTimeInterval.count();
159 LOGF(info,
"Already too much data (> %d) to transfer in this period - event will not be not saved ...", conf.maxBytes);
163 const auto pv = keyVal.first;
166 helper.
draw(pv, conf.trackSorting);
169 helper.
draw(pv, conf.trackSorting);
181 if (this->mDisableWrite) {
194 helper.
save(this->mJsonPath, this->mExt, conf.maxFiles, this->mReceiverHostname, this->mReceiverPort, this->mReceiverTimeout, this->mUseOnlyFiles, this->mUseOnlySockets);
196 currentTime = std::chrono::high_resolution_clock::now();
197 this->mTimeStamp = currentTime;
203 if (conf.PVTriggersMode) {
209 auto endTime = std::chrono::high_resolution_clock::now();
210 LOGP(info,
"Visualization of TF:{} at orbit {} took {} s.", tinfo.tfCounter, tinfo.firstTForbit, std::chrono::duration_cast<std::chrono::microseconds>(endTime - currentTime).count() * 1e-6);
214 LOGP(info,
"Data files saved: {}", filesSaved);
216 std::unordered_map<o2::dataformats::GlobalTrackID, std::size_t> savedDataTypes;
219 savedDataTypes[
i] = 0;
223 savedDataTypes[gid.getSource()] += 1;
226 std::vector<std::string> sourceStats;
229 const auto combinedMask = mTrkMask | mClMask;
232 if (combinedMask[
i]) {
248 static bool initOnceDone =
false;
256 mRunType = grpECS->getRunType();
268 if (mEMCALCalibLoader && mEMCALCalibLoader->finalizeCCDB(matcher, obj)) {
272 LOGF(info,
"ITS cluster dictionary updated");
277 LOGF(info,
"MFT cluster dictionary updated");
289 LOGF(info,
"------------------------ defineDataProcessing ", o2_eve_version,
" ------------------------------------");
293 auto jsonFolder = cfgc.
options().
get<std::string>(
"jsons-folder");
294 std::string ext =
".eve";
295 auto useJsonFormat = cfgc.
options().
get<
bool>(
"use-json-format");
299 auto useROOTFormat = cfgc.
options().
get<
bool>(
"use-root-format");
303 auto eveHostName = cfgc.
options().
get<std::string>(
"eve-hostname");
307 bool useMC = !cfgc.
options().
get<
bool>(
"disable-mc");
308 bool disableWrite = cfgc.
options().
get<
bool>(
"disable-write");
310 auto receiverHostname = cfgc.
options().
get<std::string>(
"receiver-hostname");
311 auto receiverPort = cfgc.
options().
get<
int>(
"receiver-port");
312 auto receiverTimeout = cfgc.
options().
get<
int>(
"receiver-timeout");
313 auto useOnlyFiles = cfgc.
options().
get<
bool>(
"use-only-files");
314 auto useOnlySockets = cfgc.
options().
get<
bool>(
"use-only-sockets");
316 char hostname[_POSIX_HOST_NAME_MAX];
317 gethostname(hostname, _POSIX_HOST_NAME_MAX);
318 bool eveHostNameMatch = eveHostName.empty() || eveHostName == hostname;
320 int eveDDSColIdx = cfgc.
options().
get<
int>(
"eve-dds-collection-index");
321 if (eveDDSColIdx != -1) {
322 char* colIdx = getenv(
"DDS_COLLECTION_INDEX");
323 int myIdx = colIdx ? atoi(colIdx) : -1;
324 if (myIdx == eveDDSColIdx) {
325 LOGF(important,
"Restricting DPL Display to collection index, my index ", myIdx,
", enabled ",
int(myIdx == eveDDSColIdx));
327 LOGF(info,
"Restricting DPL Display to collection index, my index ", myIdx,
", enabled ",
int(myIdx == eveDDSColIdx));
329 eveHostNameMatch &= myIdx == eveDDSColIdx;
332 std::chrono::milliseconds timeInterval(cfgc.
options().
get<
int>(
"time-interval"));
344 srcTrk &= allowedTracks;
345 srcCl &= allowedClusters;
347 if (!srcTrk.any() && !srcCl.any()) {
348 if (cfgc.
options().
get<
bool>(
"skipOnEmptyInput")) {
349 LOGF(info,
"No valid inputs for event display, disabling event display");
350 return std::move(specs);
352 throw std::runtime_error(
"No input configured");
355 auto isRangeEnabled = [&opts = cfgc.
options()](
const char* min_name,
const char* max_name) {
357 bool optEnabled =
false;
359 if (bracket.getMin() < 0 && bracket.getMax() < 0) {
361 }
else if (bracket.getMin() >= 0 && bracket.getMax() >= 0) {
364 if (bracket.isInvalid()) {
365 throw std::runtime_error(fmt::format(
"{}, {} bracket is invalid", min_name, max_name));
368 throw std::runtime_error(fmt::format(
"Both boundaries, {} and {}, have to be specified at the same time", min_name, max_name));
371 return std::make_tuple(optEnabled, bracket);
374 std::shared_ptr<DataRequest> dataRequest = std::make_shared<DataRequest>();
375 dataRequest->requestTracks(srcTrk, useMC);
376 dataRequest->requestClusters(srcCl, useMC);
378 if (conf.filterITSROF) {
379 dataRequest->requestIRFramesITS();
385 dataRequest->requestPrimaryVertices(useMC);
390 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(
false,
399 std::shared_ptr<o2::emcal::CalibLoader> emcalCalibLoader;
400 if (conf.calibrateEMC) {
401 emcalCalibLoader = std::make_shared<o2::emcal::CalibLoader>();
402 emcalCalibLoader->enableTimeCalib(
true);
403 emcalCalibLoader->enableBadChannelMap(
true);
404 emcalCalibLoader->enableGainCalib(
true);
405 emcalCalibLoader->defineInputSpecs(dataRequest->inputs);
412 AlgorithmSpec{adaptFromTask<O2DPLDisplaySpec>(disableWrite, useMC, srcTrk, srcCl, dataRequest, ggRequest,
413 emcalCalibLoader, jsonFolder, ext, timeInterval, eveHostNameMatch,
414 receiverHostname, receiverPort, receiverTimeout, useOnlyFiles, useOnlySockets)}});
419 return std::move(specs);
Loading content of the Folder and returning sorted.
Global index for barrel track: provides provenance (detectors combination), index in respective array...
Definition of the Names Generator class.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
Definition of the MCH track.
Helper class to obtain TPC clusters / digits / labels from DPL.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static const EveConfParam & Instance()
static void updateFromString(std::string const &)
const o2::itsmft::TopologyDictionary * mMFTDict
const o2::itsmft::TopologyDictionary * mITSDict
void setITSDict(const o2::itsmft::TopologyDictionary *d)
void setMFTDict(const o2::itsmft::TopologyDictionary *d)
static bool canCreateNextFile(const std::vector< std::string > &paths, const std::string &marker, const std::vector< std::string > &ext, long long millisec, long capacityAllowed)
static std::vector< std::string > allFolders(const std::string &location)
std::unordered_map< std::size_t, std::vector< GID > > mPrimaryVertexTriggerGIDs
void save(const std::string &jsonPath, const std::string &ext, int numberOfFiles, const std::string &receiverHostname, int receiverPort, int receiverTimeout, bool useOnlyFiles, bool useOnlySockets)
o2::event_visualisation::VisualisationEvent mEvent
void setRecoContainer(const o2::globaltracking::RecoContainer *rc)
void setTPCVDrift(const o2::tpc::VDriftCorrFact *v)
void draw(std::size_t primaryVertexIdx, bool sortTracks)
void selectTracks(const CalibObjectsConst *calib, GID::mask_t maskCl, GID::mask_t maskTrk, GID::mask_t maskMatch)
void setEMCALCellRecalibrator(o2::emcal::CellRecalibrator *calibrator)
void prepareITSClusters(const o2::itsmft::TopologyDictionary *dict)
void prepareMFTClusters(const o2::itsmft::TopologyDictionary *dict)
std::unordered_map< std::size_t, std::vector< GID > > mPrimaryVertexTrackGIDs
std::unordered_set< GID > mTotalAcceptedDataTypes
std::unordered_map< GID, std::size_t > mTotalDataTypes
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj) final
void run(o2::framework::ProcessingContext &pc) final
static constexpr auto allowedClusters
void init(o2::framework::InitContext &ic) final
static constexpr auto allowedTracks
size_t getDetectorTrackCount(o2::detectors::DetID::ID id) const
void setFirstTForbit(o2::header::DataHeader::TForbitType value)
void setRunNumber(o2::header::DataHeader::RunNumberType runNumber)
void setCreationTime(o2::framework::DataProcessingHeader::CreationTime creationTime)
void setTrkMask(int value)
void setRunType(o2::parameters::GRPECS::RunType runType)
void setTfCounter(o2::header::DataHeader::TFCounterType value)
size_t getTrackCount() const
void setPrimaryVertex(std::size_t pv)
void setClMask(int value)
ConfigParamRegistry & options() const
T get(const char *key) const
ServiceRegistryRef services()
The services registry associated with this processing context.
static void requestCCDBInputs(std::vector< o2::framework::InputSpec > &inputs, bool laser=true, bool itstpcTgl=true)
void extractCCDBInputs(o2::framework::ProcessingContext &pc, bool laser=true, bool itstpcTgl=true)
const VDriftCorrFact & getVDriftObject() const
bool accountCCDBInputs(const o2::framework::ConcreteDataMatcher &matcher, void *obj)
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
Global TPC definitions and constants.
SettingsProcessing configProcessing
CalibObjectsConst configCalib
void collectData(o2::framework::ProcessingContext &pc, const DataRequest &request)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))