Project
Loading...
Searching...
No Matches
ctf-reader-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <string>
13#include <vector>
14#include "Framework/Logger.h"
18#include "Framework/InputSpec.h"
25
26// Specific detectors specs
43#ifdef WITH_OPENMP
44#include <omp.h>
45#endif
46
47using namespace o2::framework;
49
50// we need to add workflow options before including Framework/runDataProcessing
51void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
52{
53 // option allowing to set parameters
54 std::vector<o2::framework::ConfigParamSpec> options;
55 options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
56 options.push_back(ConfigParamSpec{"ctf-dict", VariantType::String, "ccdb", {"CTF dictionary: empty or ccdb=CCDB, none=no external dictionary otherwise: local filename"}});
57 options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
58 options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
59 options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
60 options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
61 options.push_back(ConfigParamSpec{"shuffle", VariantType::Bool, false, {"shuffle TF sending order (for debug)"}});
62 options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
63 options.push_back(ConfigParamSpec{"copy-dir", VariantType::String, "/tmp/", {"copy base directory for remote files"}});
64 options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}});
65 options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
66 options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max CTF files queued (copied for remote source)"}});
67 options.push_back(ConfigParamSpec{"allow-missing-detectors", VariantType::Bool, false, {"send empty message if detector is missing in the CTF (otherwise throw)"}});
68 options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
69 options.push_back(ConfigParamSpec{"ctf-reader-verbosity", VariantType::Int, 0, {"verbosity level (0: summary per detector, 1: summary per block"}});
70 options.push_back(ConfigParamSpec{"ctf-data-subspec", VariantType::Int, 0, {"subspec to use for decoded CTF messages (use non-0 if CTF writer will be attached downstream)"}});
71 options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
72 options.push_back(ConfigParamSpec{"ir-frames-files", VariantType::String, "", {"If non empty, inject selected IRFrames from this file"}});
73 options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
74 options.push_back(ConfigParamSpec{"skip-skimmed-out-tf", VariantType::Bool, false, {"Do not process TFs with empty IR-Frame coverage"}});
75 options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
76 //
77 options.push_back(ConfigParamSpec{"its-digits", VariantType::Bool, false, {"convert ITS clusters to digits"}});
78 options.push_back(ConfigParamSpec{"mft-digits", VariantType::Bool, false, {"convert MFT clusters to digits"}});
79 //
80 options.push_back(ConfigParamSpec{"emcal-decoded-subspec", VariantType::Int, 0, {"subspec to use for decoded EMCAL data"}});
81 //
82 options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
83 options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
84 options.push_back(ConfigParamSpec{"combine-devices", VariantType::Bool, false, {"combine multiple DPL devices (entropy decoders)"}});
86 std::swap(workflowOptions, options);
87}
88
89// ------------------------------------------------------------------
91
93{
94 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
95 o2::ctf::CTFReaderInp ctfInput;
96
97 WorkflowSpec specs;
98 std::string allowedDetectors = "ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP"; // FIXME: explicit list to avoid problem with upgrade detectors
99 auto mskOnly = DetID::getMask(configcontext.options().get<std::string>("onlyDet"));
100 auto mskSkip = DetID::getMask(configcontext.options().get<std::string>("skipDet"));
101 if (mskOnly.any()) {
102 ctfInput.detMask &= mskOnly;
103 } else {
104 ctfInput.detMask ^= mskSkip;
105 }
106 ctfInput.detMask &= DetID::getMask(allowedDetectors);
107 ctfInput.inpdata = configcontext.options().get<std::string>("ctf-input");
108 ctfInput.subspec = (unsigned int)configcontext.options().get<int>("ctf-data-subspec");
109 ctfInput.decSSpecEMC = (unsigned int)configcontext.options().get<int>("emcal-decoded-subspec");
110 if (ctfInput.inpdata.empty() || ctfInput.inpdata == "none") {
111 if (!configcontext.helpOnCommandLine()) {
112 throw std::runtime_error("--ctf-input <file,...> is not provided");
113 }
114 ctfInput.inpdata = "";
115 }
116
117 ctfInput.maxLoops = configcontext.options().get<int>("loop");
118 if (ctfInput.maxLoops < 0) {
119 ctfInput.maxLoops = 0x7fffffff;
120 }
121 ctfInput.delay_us = int32_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
122 if (ctfInput.delay_us < 0) {
123 ctfInput.delay_us = 0;
124 }
125
126 ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
127
128 ctfInput.shuffle = configcontext.options().get<bool>("shuffle");
129 ctfInput.copyCmd = configcontext.options().get<std::string>("copy-cmd");
130 ctfInput.copyDir = configcontext.options().get<std::string>("copy-dir");
131 ctfInput.tffileRegex = configcontext.options().get<std::string>("ctf-file-regex");
132 ctfInput.remoteRegex = configcontext.options().get<std::string>("remote-regex");
133 ctfInput.allowMissingDetectors = configcontext.options().get<bool>("allow-missing-detectors");
134 ctfInput.sup0xccdb = !configcontext.options().get<bool>("send-diststf-0xccdb");
135 ctfInput.minSHM = std::stoul(configcontext.options().get<std::string>("timeframes-shm-limit"));
136 ctfInput.fileIRFrames = configcontext.options().get<std::string>("ir-frames-files");
137 ctfInput.fileRunTimeSpans = configcontext.options().get<std::string>("run-time-span-file");
138 ctfInput.skipSkimmedOutTF = configcontext.options().get<bool>("skip-skimmed-out-tf");
139 ctfInput.invertIRFramesSelection = configcontext.options().get<bool>("invert-irframe-selection");
140 ctfInput.dictOpt = configcontext.options().get<std::string>("ctf-dict");
141 int verbosity = configcontext.options().get<int>("ctf-reader-verbosity");
142
143 int rateLimitingIPCID = std::stoi(configcontext.options().get<std::string>("timeframes-rate-limit-ipcid"));
144 std::string chanFmt = configcontext.options().get<std::string>("metric-feedback-channel-format");
145 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
146 ctfInput.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
147 }
148 if (!ctfInput.fileRunTimeSpans.empty()) {
149 ctfInput.skipSkimmedOutTF = true;
150 }
151 if (!ctfInput.fileIRFrames.empty() && !ctfInput.fileRunTimeSpans.empty()) {
152 LOGP(fatal, "One cannot provide --ir-frames-files and --run-time-span-file options simultaneously");
153 }
156
157 specs.push_back(o2::ctf::getCTFReaderSpec(ctfInput));
158
159 auto pipes = configcontext.options().get<std::string>("pipeline");
160 std::unordered_map<std::string, int> plines;
161 auto ptokens = o2::utils::Str::tokenize(pipes, ',');
162 for (auto& token : ptokens) {
163 auto split = token.find(":");
164 if (split == std::string::npos) {
165 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
166 }
167 auto key = token.substr(0, split);
168 token.erase(0, split + 1);
169 size_t error;
170 auto value = std::stoll(token, &error, 10);
171 if (token[error] != '\0') {
172 throw std::runtime_error("Bad pipeline definition. Expecting integer");
173 }
174 if (value > 1) {
175 plines[key] = value;
176 }
177 }
178
179 std::vector<WorkflowSpec> decSpecsV;
180
181 auto addSpecs = [&decSpecsV, &plines](DataProcessorSpec&& s) {
182 auto entry = plines.find(s.name);
183 size_t mult = (entry == plines.end() || entry->second < 2) ? 1 : entry->second;
184 if (mult > decSpecsV.size()) {
185 decSpecsV.resize(mult);
186 }
187 decSpecsV[mult - 1].push_back(s);
188 };
189
190 // add decoders for all allowed detectors.
191 if (ctfInput.detMask[DetID::ITS]) {
193 addSpecs(o2::itsmft::getITSEntropyDecoderSpec(verbosity, doStag, configcontext.options().get<bool>("its-digits"), ctfInput.subspec, ctfInput.dictOpt));
194 }
195 if (ctfInput.detMask[DetID::MFT]) {
197 addSpecs(o2::itsmft::getMFTEntropyDecoderSpec(verbosity, doStag, configcontext.options().get<bool>("mft-digits"), ctfInput.subspec, ctfInput.dictOpt));
198 }
199 if (ctfInput.detMask[DetID::TPC]) {
200 addSpecs(o2::tpc::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
201 }
202 if (ctfInput.detMask[DetID::TRD]) {
203 addSpecs(o2::trd::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
204 }
205 if (ctfInput.detMask[DetID::TOF]) {
206 addSpecs(o2::tof::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
207 }
208 if (ctfInput.detMask[DetID::FT0]) {
209 addSpecs(o2::ft0::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
210 }
211 if (ctfInput.detMask[DetID::FV0]) {
212 addSpecs(o2::fv0::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
213 }
214 if (ctfInput.detMask[DetID::FDD]) {
215 addSpecs(o2::fdd::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
216 }
217 if (ctfInput.detMask[DetID::MID]) {
218 addSpecs(o2::mid::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
219 }
220 if (ctfInput.detMask[DetID::MCH]) {
221 addSpecs(o2::mch::getEntropyDecoderSpec(verbosity, "mch-entropy-decoder", ctfInput.subspec, ctfInput.dictOpt));
222 }
223 if (ctfInput.detMask[DetID::EMC]) {
224 addSpecs(o2::emcal::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.decSSpecEMC, ctfInput.dictOpt));
225 }
226 if (ctfInput.detMask[DetID::PHS]) {
227 addSpecs(o2::phos::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
228 }
229 if (ctfInput.detMask[DetID::CPV]) {
230 addSpecs(o2::cpv::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
231 }
232 if (ctfInput.detMask[DetID::ZDC]) {
233 addSpecs(o2::zdc::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
234 }
235 if (ctfInput.detMask[DetID::HMP]) {
236 addSpecs(o2::hmpid::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
237 }
238 if (ctfInput.detMask[DetID::CTP]) {
239 addSpecs(o2::ctp::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
240 }
241
242 bool combine = configcontext.options().get<bool>("combine-devices");
243 if (!combine) {
244 for (auto& decSpecs : decSpecsV) {
245 for (auto& s : decSpecs) {
246 specs.push_back(s);
247 }
248 }
249 } else {
250 std::vector<DataProcessorSpec> remaining;
251 if (decSpecsV.size() && decSpecsV[0].size()) {
252 specs.push_back(specCombiner("EntropyDecoders", decSpecsV[0], remaining)); // processing w/o pipelining
253 }
254 bool updatePipelines = false;
255 for (size_t i = 1; i < decSpecsV.size(); i++) { // add pipelined processes separately, consider combining them to separate groups (need to have modify argument of pipeline option)
256 if (decSpecsV[i].size() > 1) {
257 specs.push_back(specCombiner(fmt::format("EntropyDecodersP{}", i + 1), decSpecsV[i], remaining)); // processing pipelining multiplicity i+1
258 updatePipelines = true;
259 pipes += fmt::format(",EntropyDecodersP{}:{}", i + 1, i + 1);
260 } else {
261 for (auto& s : decSpecsV[i]) {
262 specs.push_back(s);
263 }
264 }
265 }
266 for (auto& s : remaining) {
267 specs.push_back(s);
268 }
269 if (updatePipelines) {
270 configcontext.options().override("pipeline", pipes);
271 }
272 }
273
274 return std::move(specs);
275}
#define verbosity
Convert CTF (EncodedBlocks) to CPV digit/channels strean.
Convert CTF (EncodedBlocks) to CTP digit stream.
Convert CTF (EncodedBlocks) to EMCAL digit/channels strean.
Convert CTF (EncodedBlocks) to FDD digit/channels strean.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to FV0 digit/channels strean.
int32_t i
Convert CTF (EncodedBlocks) to HMP digit/tracklets stream.
Convert CTF (EncodedBlocks) to clusters streams.
Convert CTF (EncodedBlocks) to MCH {Digit,ROFRecord} stream.
Convert CTF (EncodedBlocks) to MID ROFRecords/ColumnData stream.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to PHOS digit/channels strean.
Helper function to tokenize sequences and ranges of integral numbers.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to TRD digit/tracklets stream.
Convert CTF (EncodedBlocks) to ZDC BCData/ChannelData/OrbitData stream.
StringRef key
static void updateFromString(std::string const &)
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr std::string_view NONE
keywork for no-detector
Definition DetID.h:104
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr std::string_view ALL
keywork for all detectors
Definition DetID.h:105
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID TPC
Definition DetID.h:64
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry & options() const
void override(const char *key, ConfigValueType auto const &val) const
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLsizei const GLfloat * value
Definition glcorearb.h:819
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspecInp, unsigned int sspecOut=0, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getMFTEntropyDecoderSpec(int verbosity, bool doStag, bool getDigits, unsigned int sspec, const std::string &ctfdictOpt)
framework::DataProcessorSpec getITSEntropyDecoderSpec(int verbosity, bool doStag, bool getDigits, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, const char *specName, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
std::vector< std::string > split(const std::string &str, char delimiter=',')
std::string fileRunTimeSpans
unsigned int decSSpecEMC
std::string metricChannel
o2::detectors::DetID::mask_t detMask
static bool isITSStaggeringEnabled(o2::framework::ConfigContext const &cfgc)
static bool isMFTStaggeringEnabled(o2::framework::ConfigContext const &cfgc)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)