Project
Loading...
Searching...
No Matches
ctf-reader-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <string>
13#include <vector>
14#include "Framework/Logger.h"
18#include "Framework/InputSpec.h"
25
26// Specific detectors specs
42#ifdef WITH_OPENMP
43#include <omp.h>
44#endif
45
46using namespace o2::framework;
48
49// we need to add workflow options before including Framework/runDataProcessing
50void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
51{
52 // option allowing to set parameters
53 std::vector<o2::framework::ConfigParamSpec> options;
54 options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
55 options.push_back(ConfigParamSpec{"ctf-dict", VariantType::String, "ccdb", {"CTF dictionary: empty or ccdb=CCDB, none=no external dictionary otherwise: local filename"}});
56 options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
57 options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
58 options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
59 options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
60 options.push_back(ConfigParamSpec{"shuffle", VariantType::Bool, false, {"shuffle TF sending order (for debug)"}});
61 options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
62 options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}});
63 options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
64 options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max CTF files queued (copied for remote source)"}});
65 options.push_back(ConfigParamSpec{"allow-missing-detectors", VariantType::Bool, false, {"send empty message if detector is missing in the CTF (otherwise throw)"}});
66 options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
67 options.push_back(ConfigParamSpec{"ctf-reader-verbosity", VariantType::Int, 0, {"verbosity level (0: summary per detector, 1: summary per block"}});
68 options.push_back(ConfigParamSpec{"ctf-data-subspec", VariantType::Int, 0, {"subspec to use for decoded CTF messages (use non-0 if CTF writer will be attached downstream)"}});
69 options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
70 options.push_back(ConfigParamSpec{"ir-frames-files", VariantType::String, "", {"If non empty, inject selected IRFrames from this file"}});
71 options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
72 options.push_back(ConfigParamSpec{"skip-skimmed-out-tf", VariantType::Bool, false, {"Do not process TFs with empty IR-Frame coverage"}});
73 options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
74 //
75 options.push_back(ConfigParamSpec{"its-digits", VariantType::Bool, false, {"convert ITS clusters to digits"}});
76 options.push_back(ConfigParamSpec{"mft-digits", VariantType::Bool, false, {"convert MFT clusters to digits"}});
77 //
78 options.push_back(ConfigParamSpec{"emcal-decoded-subspec", VariantType::Int, 0, {"subspec to use for decoded EMCAL data"}});
79 //
80 options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
81 options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
82 options.push_back(ConfigParamSpec{"combine-devices", VariantType::Bool, false, {"combine multiple DPL devices (entropy decoders)"}});
83 std::swap(workflowOptions, options);
84}
85
86// ------------------------------------------------------------------
88
90{
91 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
92 o2::ctf::CTFReaderInp ctfInput;
93
94 WorkflowSpec specs;
95 std::string allowedDetectors = "ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP"; // FIXME: explicit list to avoid problem with upgrade detectors
96 auto mskOnly = DetID::getMask(configcontext.options().get<std::string>("onlyDet"));
97 auto mskSkip = DetID::getMask(configcontext.options().get<std::string>("skipDet"));
98 if (mskOnly.any()) {
99 ctfInput.detMask &= mskOnly;
100 } else {
101 ctfInput.detMask ^= mskSkip;
102 }
103 ctfInput.detMask &= DetID::getMask(allowedDetectors);
104 ctfInput.inpdata = configcontext.options().get<std::string>("ctf-input");
105 ctfInput.subspec = (unsigned int)configcontext.options().get<int>("ctf-data-subspec");
106 ctfInput.decSSpecEMC = (unsigned int)configcontext.options().get<int>("emcal-decoded-subspec");
107 if (ctfInput.inpdata.empty() || ctfInput.inpdata == "none") {
108 if (!configcontext.helpOnCommandLine()) {
109 throw std::runtime_error("--ctf-input <file,...> is not provided");
110 }
111 ctfInput.inpdata = "";
112 }
113
114 ctfInput.maxLoops = configcontext.options().get<int>("loop");
115 if (ctfInput.maxLoops < 0) {
116 ctfInput.maxLoops = 0x7fffffff;
117 }
118 ctfInput.delay_us = int32_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
119 if (ctfInput.delay_us < 0) {
120 ctfInput.delay_us = 0;
121 }
122
123 ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
124
125 ctfInput.shuffle = configcontext.options().get<bool>("shuffle");
126 ctfInput.copyCmd = configcontext.options().get<std::string>("copy-cmd");
127 ctfInput.tffileRegex = configcontext.options().get<std::string>("ctf-file-regex");
128 ctfInput.remoteRegex = configcontext.options().get<std::string>("remote-regex");
129 ctfInput.allowMissingDetectors = configcontext.options().get<bool>("allow-missing-detectors");
130 ctfInput.sup0xccdb = !configcontext.options().get<bool>("send-diststf-0xccdb");
131 ctfInput.minSHM = std::stoul(configcontext.options().get<std::string>("timeframes-shm-limit"));
132 ctfInput.fileIRFrames = configcontext.options().get<std::string>("ir-frames-files");
133 ctfInput.fileRunTimeSpans = configcontext.options().get<std::string>("run-time-span-file");
134 ctfInput.skipSkimmedOutTF = configcontext.options().get<bool>("skip-skimmed-out-tf");
135 ctfInput.invertIRFramesSelection = configcontext.options().get<bool>("invert-irframe-selection");
136 ctfInput.dictOpt = configcontext.options().get<std::string>("ctf-dict");
137 int verbosity = configcontext.options().get<int>("ctf-reader-verbosity");
138
139 int rateLimitingIPCID = std::stoi(configcontext.options().get<std::string>("timeframes-rate-limit-ipcid"));
140 std::string chanFmt = configcontext.options().get<std::string>("metric-feedback-channel-format");
141 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
142 ctfInput.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
143 }
144 if (!ctfInput.fileRunTimeSpans.empty()) {
145 ctfInput.skipSkimmedOutTF = true;
146 }
147 if (!ctfInput.fileIRFrames.empty() && !ctfInput.fileRunTimeSpans.empty()) {
148 LOGP(fatal, "One cannot provide --ir-frames-files and --run-time-span-file options simultaneously");
149 }
150
151 specs.push_back(o2::ctf::getCTFReaderSpec(ctfInput));
152
153 auto pipes = configcontext.options().get<std::string>("pipeline");
154 std::unordered_map<std::string, int> plines;
155 auto ptokens = o2::utils::Str::tokenize(pipes, ',');
156 for (auto& token : ptokens) {
157 auto split = token.find(":");
158 if (split == std::string::npos) {
159 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
160 }
161 auto key = token.substr(0, split);
162 token.erase(0, split + 1);
163 size_t error;
164 auto value = std::stoll(token, &error, 10);
165 if (token[error] != '\0') {
166 throw std::runtime_error("Bad pipeline definition. Expecting integer");
167 }
168 if (value > 1) {
169 plines[key] = value;
170 }
171 }
172
173 std::vector<WorkflowSpec> decSpecsV;
174
175 auto addSpecs = [&decSpecsV, &plines](DataProcessorSpec&& s) {
176 auto entry = plines.find(s.name);
177 size_t mult = (entry == plines.end() || entry->second < 2) ? 1 : entry->second;
178 if (mult > decSpecsV.size()) {
179 decSpecsV.resize(mult);
180 }
181 decSpecsV[mult - 1].push_back(s);
182 };
183
184 // add decoders for all allowed detectors.
185 if (ctfInput.detMask[DetID::ITS]) {
186 addSpecs(o2::itsmft::getEntropyDecoderSpec(DetID::getDataOrigin(DetID::ITS), verbosity, configcontext.options().get<bool>("its-digits"), ctfInput.subspec, ctfInput.dictOpt));
187 }
188 if (ctfInput.detMask[DetID::MFT]) {
189 addSpecs(o2::itsmft::getEntropyDecoderSpec(DetID::getDataOrigin(DetID::MFT), verbosity, configcontext.options().get<bool>("mft-digits"), ctfInput.subspec, ctfInput.dictOpt));
190 }
191 if (ctfInput.detMask[DetID::TPC]) {
192 addSpecs(o2::tpc::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
193 }
194 if (ctfInput.detMask[DetID::TRD]) {
195 addSpecs(o2::trd::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
196 }
197 if (ctfInput.detMask[DetID::TOF]) {
198 addSpecs(o2::tof::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
199 }
200 if (ctfInput.detMask[DetID::FT0]) {
201 addSpecs(o2::ft0::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
202 }
203 if (ctfInput.detMask[DetID::FV0]) {
204 addSpecs(o2::fv0::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
205 }
206 if (ctfInput.detMask[DetID::FDD]) {
207 addSpecs(o2::fdd::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
208 }
209 if (ctfInput.detMask[DetID::MID]) {
210 addSpecs(o2::mid::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
211 }
212 if (ctfInput.detMask[DetID::MCH]) {
213 addSpecs(o2::mch::getEntropyDecoderSpec(verbosity, "mch-entropy-decoder", ctfInput.subspec, ctfInput.dictOpt));
214 }
215 if (ctfInput.detMask[DetID::EMC]) {
216 addSpecs(o2::emcal::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.decSSpecEMC, ctfInput.dictOpt));
217 }
218 if (ctfInput.detMask[DetID::PHS]) {
219 addSpecs(o2::phos::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
220 }
221 if (ctfInput.detMask[DetID::CPV]) {
222 addSpecs(o2::cpv::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
223 }
224 if (ctfInput.detMask[DetID::ZDC]) {
225 addSpecs(o2::zdc::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
226 }
227 if (ctfInput.detMask[DetID::HMP]) {
228 addSpecs(o2::hmpid::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
229 }
230 if (ctfInput.detMask[DetID::CTP]) {
231 addSpecs(o2::ctp::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.dictOpt));
232 }
233
234 bool combine = configcontext.options().get<bool>("combine-devices");
235 if (!combine) {
236 for (auto& decSpecs : decSpecsV) {
237 for (auto& s : decSpecs) {
238 specs.push_back(s);
239 }
240 }
241 } else {
242 std::vector<DataProcessorSpec> remaining;
243 if (decSpecsV.size() && decSpecsV[0].size()) {
244 specs.push_back(specCombiner("EntropyDecoders", decSpecsV[0], remaining)); // processing w/o pipelining
245 }
246 bool updatePipelines = false;
247 for (size_t i = 1; i < decSpecsV.size(); i++) { // add pipelined processes separately, consider combining them to separate groups (need to have modify argument of pipeline option)
248 if (decSpecsV[i].size() > 1) {
249 specs.push_back(specCombiner(fmt::format("EntropyDecodersP{}", i + 1), decSpecsV[i], remaining)); // processing pipelining multiplicity i+1
250 updatePipelines = true;
251 pipes += fmt::format(",EntropyDecodersP{}:{}", i + 1, i + 1);
252 } else {
253 for (auto& s : decSpecsV[i]) {
254 specs.push_back(s);
255 }
256 }
257 }
258 for (auto& s : remaining) {
259 specs.push_back(s);
260 }
261 if (updatePipelines) {
262 configcontext.options().override("pipeline", pipes);
263 }
264 }
265
266 return std::move(specs);
267}
#define verbosity
Convert CTF (EncodedBlocks) to CPV digit/channels strean.
Convert CTF (EncodedBlocks) to CTP digit stream.
Convert CTF (EncodedBlocks) to EMCAL digit/channels strean.
Convert CTF (EncodedBlocks) to FDD digit/channels strean.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to FV0 digit/channels strean.
int32_t i
Convert CTF (EncodedBlocks) to HMP digit/tracklets stream.
Convert CTF (EncodedBlocks) to clusters streams.
Convert CTF (EncodedBlocks) to MCH {Digit,ROFRecord} stream.
Convert CTF (EncodedBlocks) to MID ROFRecords/ColumnData stream.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to PHOS digit/channels strean.
Helper function to tokenize sequences and ranges of integral numbers.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to TRD digit/tracklets stream.
Convert CTF (EncodedBlocks) to ZDC BCData/ChannelData/OrbitData stream.
StringRef key
static void updateFromString(std::string const &)
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr std::string_view NONE
keywork for no-detector
Definition DetID.h:104
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr std::string_view ALL
keywork for all detectors
Definition DetID.h:105
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:148
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry & options() const
void override(const char *key, ConfigValueType auto const &val) const
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLsizei const GLfloat * value
Definition glcorearb.h:819
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspecInp, unsigned int sspecOut=0, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(o2::header::DataOrigin orig, int verbosity, bool getDigits, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, const char *specName, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt="none")
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec, const std::string &ctfdictOpt)
create a processor spec
std::vector< std::string > split(const std::string &str, char delimiter=',')
std::string fileRunTimeSpans
unsigned int decSSpecEMC
std::string metricChannel
o2::detectors::DetID::mask_t detMask
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)