Project
Loading...
Searching...
No Matches
ctf-reader-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <string>
13#include <vector>
14#include "Framework/Logger.h"
18#include "Framework/InputSpec.h"
25
26// Specific detectors specs
42#ifdef WITH_OPENMP
43#include <omp.h>
44#endif
45
46using namespace o2::framework;
48
49// we need to add workflow options before including Framework/runDataProcessing
50void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
51{
52 // option allowing to set parameters
53 std::vector<o2::framework::ConfigParamSpec> options;
54 options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
55 options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
56 options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
57 options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
58 options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
59 options.push_back(ConfigParamSpec{"shuffle", VariantType::Bool, false, {"shuffle TF sending order (for debug)"}});
60 options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
61 options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}});
62 options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
63 options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max CTF files queued (copied for remote source)"}});
64 options.push_back(ConfigParamSpec{"allow-missing-detectors", VariantType::Bool, false, {"send empty message if detector is missing in the CTF (otherwise throw)"}});
65 options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
66 options.push_back(ConfigParamSpec{"ctf-reader-verbosity", VariantType::Int, 0, {"verbosity level (0: summary per detector, 1: summary per block"}});
67 options.push_back(ConfigParamSpec{"ctf-data-subspec", VariantType::Int, 0, {"subspec to use for decoded CTF messages (use non-0 if CTF writer will be attached downstream)"}});
68 options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
69 options.push_back(ConfigParamSpec{"ir-frames-files", VariantType::String, "", {"If non empty, inject selected IRFrames from this file"}});
70 options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
71 options.push_back(ConfigParamSpec{"skip-skimmed-out-tf", VariantType::Bool, false, {"Do not process TFs with empty IR-Frame coverage"}});
72 options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
73 //
74 options.push_back(ConfigParamSpec{"its-digits", VariantType::Bool, false, {"convert ITS clusters to digits"}});
75 options.push_back(ConfigParamSpec{"mft-digits", VariantType::Bool, false, {"convert MFT clusters to digits"}});
76 //
77 options.push_back(ConfigParamSpec{"emcal-decoded-subspec", VariantType::Int, 0, {"subspec to use for decoded EMCAL data"}});
78 //
79 options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
80 options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
81 options.push_back(ConfigParamSpec{"combine-devices", VariantType::Bool, false, {"combine multiple DPL devices (entropy decoders)"}});
82 std::swap(workflowOptions, options);
83}
84
85// ------------------------------------------------------------------
87
89{
90 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
91 o2::ctf::CTFReaderInp ctfInput;
92
93 WorkflowSpec specs;
94 std::string allowedDetectors = "ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP"; // FIXME: explicit list to avoid problem with upgrade detectors
95 auto mskOnly = DetID::getMask(configcontext.options().get<std::string>("onlyDet"));
96 auto mskSkip = DetID::getMask(configcontext.options().get<std::string>("skipDet"));
97 if (mskOnly.any()) {
98 ctfInput.detMask &= mskOnly;
99 } else {
100 ctfInput.detMask ^= mskSkip;
101 }
102 ctfInput.detMask &= DetID::getMask(allowedDetectors);
103 ctfInput.inpdata = configcontext.options().get<std::string>("ctf-input");
104 ctfInput.subspec = (unsigned int)configcontext.options().get<int>("ctf-data-subspec");
105 ctfInput.decSSpecEMC = (unsigned int)configcontext.options().get<int>("emcal-decoded-subspec");
106 if (ctfInput.inpdata.empty() || ctfInput.inpdata == "none") {
107 if (!configcontext.helpOnCommandLine()) {
108 throw std::runtime_error("--ctf-input <file,...> is not provided");
109 }
110 ctfInput.inpdata = "";
111 }
112
113 ctfInput.maxLoops = configcontext.options().get<int>("loop");
114 if (ctfInput.maxLoops < 0) {
115 ctfInput.maxLoops = 0x7fffffff;
116 }
117 ctfInput.delay_us = int32_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
118 if (ctfInput.delay_us < 0) {
119 ctfInput.delay_us = 0;
120 }
121
122 ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
123
124 ctfInput.shuffle = configcontext.options().get<bool>("shuffle");
125 ctfInput.copyCmd = configcontext.options().get<std::string>("copy-cmd");
126 ctfInput.tffileRegex = configcontext.options().get<std::string>("ctf-file-regex");
127 ctfInput.remoteRegex = configcontext.options().get<std::string>("remote-regex");
128 ctfInput.allowMissingDetectors = configcontext.options().get<bool>("allow-missing-detectors");
129 ctfInput.sup0xccdb = !configcontext.options().get<bool>("send-diststf-0xccdb");
130 ctfInput.minSHM = std::stoul(configcontext.options().get<std::string>("timeframes-shm-limit"));
131 ctfInput.fileIRFrames = configcontext.options().get<std::string>("ir-frames-files");
132 ctfInput.fileRunTimeSpans = configcontext.options().get<std::string>("run-time-span-file");
133 ctfInput.skipSkimmedOutTF = configcontext.options().get<bool>("skip-skimmed-out-tf");
134 ctfInput.invertIRFramesSelection = configcontext.options().get<bool>("invert-irframe-selection");
135 int verbosity = configcontext.options().get<int>("ctf-reader-verbosity");
136
137 int rateLimitingIPCID = std::stoi(configcontext.options().get<std::string>("timeframes-rate-limit-ipcid"));
138 std::string chanFmt = configcontext.options().get<std::string>("metric-feedback-channel-format");
139 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
140 ctfInput.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
141 }
142 if (!ctfInput.fileRunTimeSpans.empty()) {
143 ctfInput.skipSkimmedOutTF = true;
144 }
145 if (!ctfInput.fileIRFrames.empty() && !ctfInput.fileRunTimeSpans.empty()) {
146 LOGP(fatal, "One cannot provide --ir-frames-files and --run-time-span-file options simultaneously");
147 }
148
149 specs.push_back(o2::ctf::getCTFReaderSpec(ctfInput));
150
151 auto pipes = configcontext.options().get<std::string>("pipeline");
152 std::unordered_map<std::string, int> plines;
153 auto ptokens = o2::utils::Str::tokenize(pipes, ',');
154 for (auto& token : ptokens) {
155 auto split = token.find(":");
156 if (split == std::string::npos) {
157 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
158 }
159 auto key = token.substr(0, split);
160 token.erase(0, split + 1);
161 size_t error;
162 auto value = std::stoll(token, &error, 10);
163 if (token[error] != '\0') {
164 throw std::runtime_error("Bad pipeline definition. Expecting integer");
165 }
166 if (value > 1) {
167 plines[key] = value;
168 }
169 }
170
171 std::vector<WorkflowSpec> decSpecsV;
172
173 auto addSpecs = [&decSpecsV, &plines](DataProcessorSpec&& s) {
174 auto entry = plines.find(s.name);
175 size_t mult = (entry == plines.end() || entry->second < 2) ? 1 : entry->second;
176 if (mult > decSpecsV.size()) {
177 decSpecsV.resize(mult);
178 }
179 decSpecsV[mult - 1].push_back(s);
180 };
181
182 // add decoders for all allowed detectors.
183 if (ctfInput.detMask[DetID::ITS]) {
184 addSpecs(o2::itsmft::getEntropyDecoderSpec(DetID::getDataOrigin(DetID::ITS), verbosity, configcontext.options().get<bool>("its-digits"), ctfInput.subspec));
185 }
186 if (ctfInput.detMask[DetID::MFT]) {
187 addSpecs(o2::itsmft::getEntropyDecoderSpec(DetID::getDataOrigin(DetID::MFT), verbosity, configcontext.options().get<bool>("mft-digits"), ctfInput.subspec));
188 }
189 if (ctfInput.detMask[DetID::TPC]) {
191 }
192 if (ctfInput.detMask[DetID::TRD]) {
194 }
195 if (ctfInput.detMask[DetID::TOF]) {
197 }
198 if (ctfInput.detMask[DetID::FT0]) {
200 }
201 if (ctfInput.detMask[DetID::FV0]) {
203 }
204 if (ctfInput.detMask[DetID::FDD]) {
206 }
207 if (ctfInput.detMask[DetID::MID]) {
209 }
210 if (ctfInput.detMask[DetID::MCH]) {
211 addSpecs(o2::mch::getEntropyDecoderSpec(verbosity, "mch-entropy-decoder", ctfInput.subspec));
212 }
213 if (ctfInput.detMask[DetID::EMC]) {
214 addSpecs(o2::emcal::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.decSSpecEMC));
215 }
216 if (ctfInput.detMask[DetID::PHS]) {
218 }
219 if (ctfInput.detMask[DetID::CPV]) {
221 }
222 if (ctfInput.detMask[DetID::ZDC]) {
224 }
225 if (ctfInput.detMask[DetID::HMP]) {
227 }
228 if (ctfInput.detMask[DetID::CTP]) {
230 }
231
232 bool combine = configcontext.options().get<bool>("combine-devices");
233 if (!combine) {
234 for (auto& decSpecs : decSpecsV) {
235 for (auto& s : decSpecs) {
236 specs.push_back(s);
237 }
238 }
239 } else {
240 std::vector<DataProcessorSpec> remaining;
241 if (decSpecsV.size() && decSpecsV[0].size()) {
242 specs.push_back(specCombiner("EntropyDecoders", decSpecsV[0], remaining)); // processing w/o pipelining
243 }
244 bool updatePipelines = false;
245 for (size_t i = 1; i < decSpecsV.size(); i++) { // add pipelined processes separately, consider combining them to separate groups (need to have modify argument of pipeline option)
246 if (decSpecsV[i].size() > 1) {
247 specs.push_back(specCombiner(fmt::format("EntropyDecodersP{}", i + 1), decSpecsV[i], remaining)); // processing pipelining multiplicity i+1
248 updatePipelines = true;
249 pipes += fmt::format(",EntropyDecodersP{}:{}", i + 1, i + 1);
250 } else {
251 for (auto& s : decSpecsV[i]) {
252 specs.push_back(s);
253 }
254 }
255 }
256 for (auto& s : remaining) {
257 specs.push_back(s);
258 }
259 if (updatePipelines) {
260 configcontext.options().override("pipeline", pipes);
261 }
262 }
263
264 return std::move(specs);
265}
#define verbosity
Convert CTF (EncodedBlocks) to CPV digit/channels strean.
Convert CTF (EncodedBlocks) to CTP digit stream.
Convert CTF (EncodedBlocks) to EMCAL digit/channels strean.
Convert CTF (EncodedBlocks) to FDD digit/channels strean.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to FV0 digit/channels strean.
int32_t i
Convert CTF (EncodedBlocks) to HMP digit/tracklets stream.
Convert CTF (EncodedBlocks) to clusters streams.
Convert CTF (EncodedBlocks) to MCH {Digit,ROFRecord} stream.
Convert CTF (EncodedBlocks) to MID ROFRecords/ColumnData stream.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to PHOS digit/channels strean.
Helper function to tokenize sequences and ranges of integral numbers.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to TRD digit/tracklets stream.
Convert CTF (EncodedBlocks) to ZDC BCData/ChannelData/OrbitData stream.
StringRef key
static void updateFromString(std::string const &)
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr std::string_view NONE
keywork for no-detector
Definition DetID.h:103
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr std::string_view ALL
keywork for all detectors
Definition DetID.h:104
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry & options() const
void override(const char *key, ConfigValueType auto const &val) const
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLsizei const GLfloat * value
Definition glcorearb.h:819
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspecInp, unsigned int sspecOut=0)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(o2::header::DataOrigin orig, int verbosity, bool getDigits, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, const char *specName, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
std::vector< std::string > split(const std::string &str, char delimiter=',')
std::string fileRunTimeSpans
unsigned int decSSpecEMC
std::string metricChannel
o2::detectors::DetID::mask_t detMask
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)