Project
Loading...
Searching...
No Matches
ctf-reader-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <string>
13#include <vector>
14#include "Framework/Logger.h"
18#include "Framework/InputSpec.h"
25
26// Specific detectors specs
42#ifdef WITH_OPENMP
43#include <omp.h>
44#endif
45
46using namespace o2::framework;
48
49// we need to add workflow options before including Framework/runDataProcessing
50void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
51{
52 // option allowing to set parameters
53 std::vector<o2::framework::ConfigParamSpec> options;
54 options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
55 options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
56 options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
57 options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
58 options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
59 options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
60 options.push_back(ConfigParamSpec{"ctf-file-regex", VariantType::String, ".*o2_ctf_run.+\\.root$", {"regex string to identify CTF files"}});
61 options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
62 options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max CTF files queued (copied for remote source)"}});
63 options.push_back(ConfigParamSpec{"allow-missing-detectors", VariantType::Bool, false, {"send empty message if detector is missing in the CTF (otherwise throw)"}});
64 options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
65 options.push_back(ConfigParamSpec{"ctf-reader-verbosity", VariantType::Int, 0, {"verbosity level (0: summary per detector, 1: summary per block"}});
66 options.push_back(ConfigParamSpec{"ctf-data-subspec", VariantType::Int, 0, {"subspec to use for decoded CTF messages (use non-0 if CTF writer will be attached downstream)"}});
67 options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
68 options.push_back(ConfigParamSpec{"ir-frames-files", VariantType::String, "", {"If non empty, inject selected IRFrames from this file"}});
69 options.push_back(ConfigParamSpec{"run-time-span-file", VariantType::String, "", {"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
70 options.push_back(ConfigParamSpec{"skip-skimmed-out-tf", VariantType::Bool, false, {"Do not process TFs with empty IR-Frame coverage"}});
71 options.push_back(ConfigParamSpec{"invert-irframe-selection", VariantType::Bool, false, {"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
72 //
73 options.push_back(ConfigParamSpec{"its-digits", VariantType::Bool, false, {"convert ITS clusters to digits"}});
74 options.push_back(ConfigParamSpec{"mft-digits", VariantType::Bool, false, {"convert MFT clusters to digits"}});
75 //
76 options.push_back(ConfigParamSpec{"emcal-decoded-subspec", VariantType::Int, 0, {"subspec to use for decoded EMCAL data"}});
77 //
78 options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
79 options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
80 options.push_back(ConfigParamSpec{"combine-devices", VariantType::Bool, false, {"combine multiple DPL devices (entropy decoders)"}});
81 std::swap(workflowOptions, options);
82}
83
84// ------------------------------------------------------------------
86
88{
89 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
90 o2::ctf::CTFReaderInp ctfInput;
91
92 WorkflowSpec specs;
93 std::string allowedDetectors = "ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP"; // FIXME: explicit list to avoid problem with upgrade detectors
94 auto mskOnly = DetID::getMask(configcontext.options().get<std::string>("onlyDet"));
95 auto mskSkip = DetID::getMask(configcontext.options().get<std::string>("skipDet"));
96 if (mskOnly.any()) {
97 ctfInput.detMask &= mskOnly;
98 } else {
99 ctfInput.detMask ^= mskSkip;
100 }
101 ctfInput.detMask &= DetID::getMask(allowedDetectors);
102 ctfInput.inpdata = configcontext.options().get<std::string>("ctf-input");
103 ctfInput.subspec = (unsigned int)configcontext.options().get<int>("ctf-data-subspec");
104 ctfInput.decSSpecEMC = (unsigned int)configcontext.options().get<int>("emcal-decoded-subspec");
105 if (ctfInput.inpdata.empty() || ctfInput.inpdata == "none") {
106 if (!configcontext.helpOnCommandLine()) {
107 throw std::runtime_error("--ctf-input <file,...> is not provided");
108 }
109 ctfInput.inpdata = "";
110 }
111
112 ctfInput.maxLoops = configcontext.options().get<int>("loop");
113 if (ctfInput.maxLoops < 0) {
114 ctfInput.maxLoops = 0x7fffffff;
115 }
116 ctfInput.delay_us = int32_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
117 if (ctfInput.delay_us < 0) {
118 ctfInput.delay_us = 0;
119 }
120
121 ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
122
123 ctfInput.copyCmd = configcontext.options().get<std::string>("copy-cmd");
124 ctfInput.tffileRegex = configcontext.options().get<std::string>("ctf-file-regex");
125 ctfInput.remoteRegex = configcontext.options().get<std::string>("remote-regex");
126 ctfInput.allowMissingDetectors = configcontext.options().get<bool>("allow-missing-detectors");
127 ctfInput.sup0xccdb = !configcontext.options().get<bool>("send-diststf-0xccdb");
128 ctfInput.minSHM = std::stoul(configcontext.options().get<std::string>("timeframes-shm-limit"));
129 ctfInput.fileIRFrames = configcontext.options().get<std::string>("ir-frames-files");
130 ctfInput.fileRunTimeSpans = configcontext.options().get<std::string>("run-time-span-file");
131 ctfInput.skipSkimmedOutTF = configcontext.options().get<bool>("skip-skimmed-out-tf");
132 ctfInput.invertIRFramesSelection = configcontext.options().get<bool>("invert-irframe-selection");
133 int verbosity = configcontext.options().get<int>("ctf-reader-verbosity");
134
135 int rateLimitingIPCID = std::stoi(configcontext.options().get<std::string>("timeframes-rate-limit-ipcid"));
136 std::string chanFmt = configcontext.options().get<std::string>("metric-feedback-channel-format");
137 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
138 ctfInput.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
139 }
140 if (!ctfInput.fileRunTimeSpans.empty()) {
141 ctfInput.skipSkimmedOutTF = true;
142 }
143 if (!ctfInput.fileIRFrames.empty() && !ctfInput.fileRunTimeSpans.empty()) {
144 LOGP(fatal, "One cannot provide --ir-frames-files and --run-time-span-file options simultaneously");
145 }
146
147 specs.push_back(o2::ctf::getCTFReaderSpec(ctfInput));
148
149 auto pipes = configcontext.options().get<std::string>("pipeline");
150 std::unordered_map<std::string, int> plines;
151 auto ptokens = o2::utils::Str::tokenize(pipes, ',');
152 for (auto& token : ptokens) {
153 auto split = token.find(":");
154 if (split == std::string::npos) {
155 throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
156 }
157 auto key = token.substr(0, split);
158 token.erase(0, split + 1);
159 size_t error;
160 auto value = std::stoll(token, &error, 10);
161 if (token[error] != '\0') {
162 throw std::runtime_error("Bad pipeline definition. Expecting integer");
163 }
164 if (value > 1) {
165 plines[key] = value;
166 }
167 }
168
169 std::vector<WorkflowSpec> decSpecsV;
170
171 auto addSpecs = [&decSpecsV, &plines](DataProcessorSpec&& s) {
172 auto entry = plines.find(s.name);
173 size_t mult = (entry == plines.end() || entry->second < 2) ? 1 : entry->second;
174 if (mult > decSpecsV.size()) {
175 decSpecsV.resize(mult);
176 }
177 decSpecsV[mult - 1].push_back(s);
178 };
179
180 // add decoders for all allowed detectors.
181 if (ctfInput.detMask[DetID::ITS]) {
182 addSpecs(o2::itsmft::getEntropyDecoderSpec(DetID::getDataOrigin(DetID::ITS), verbosity, configcontext.options().get<bool>("its-digits"), ctfInput.subspec));
183 }
184 if (ctfInput.detMask[DetID::MFT]) {
185 addSpecs(o2::itsmft::getEntropyDecoderSpec(DetID::getDataOrigin(DetID::MFT), verbosity, configcontext.options().get<bool>("mft-digits"), ctfInput.subspec));
186 }
187 if (ctfInput.detMask[DetID::TPC]) {
189 }
190 if (ctfInput.detMask[DetID::TRD]) {
192 }
193 if (ctfInput.detMask[DetID::TOF]) {
195 }
196 if (ctfInput.detMask[DetID::FT0]) {
198 }
199 if (ctfInput.detMask[DetID::FV0]) {
201 }
202 if (ctfInput.detMask[DetID::FDD]) {
204 }
205 if (ctfInput.detMask[DetID::MID]) {
207 }
208 if (ctfInput.detMask[DetID::MCH]) {
209 addSpecs(o2::mch::getEntropyDecoderSpec(verbosity, "mch-entropy-decoder", ctfInput.subspec));
210 }
211 if (ctfInput.detMask[DetID::EMC]) {
212 addSpecs(o2::emcal::getEntropyDecoderSpec(verbosity, ctfInput.subspec, ctfInput.decSSpecEMC));
213 }
214 if (ctfInput.detMask[DetID::PHS]) {
216 }
217 if (ctfInput.detMask[DetID::CPV]) {
219 }
220 if (ctfInput.detMask[DetID::ZDC]) {
222 }
223 if (ctfInput.detMask[DetID::HMP]) {
225 }
226 if (ctfInput.detMask[DetID::CTP]) {
228 }
229
230 bool combine = configcontext.options().get<bool>("combine-devices");
231 if (!combine) {
232 for (auto& decSpecs : decSpecsV) {
233 for (auto& s : decSpecs) {
234 specs.push_back(s);
235 }
236 }
237 } else {
238 std::vector<DataProcessorSpec> remaining;
239 if (decSpecsV.size() && decSpecsV[0].size()) {
240 specs.push_back(specCombiner("EntropyDecoders", decSpecsV[0], remaining)); // processing w/o pipelining
241 }
242 bool updatePipelines = false;
243 for (size_t i = 1; i < decSpecsV.size(); i++) { // add pipelined processes separately, consider combining them to separate groups (need to have modify argument of pipeline option)
244 if (decSpecsV[i].size() > 1) {
245 specs.push_back(specCombiner(fmt::format("EntropyDecodersP{}", i + 1), decSpecsV[i], remaining)); // processing pipelining multiplicity i+1
246 updatePipelines = true;
247 pipes += fmt::format(",EntropyDecodersP{}:{}", i + 1, i + 1);
248 } else {
249 for (auto& s : decSpecsV[i]) {
250 specs.push_back(s);
251 }
252 }
253 }
254 for (auto& s : remaining) {
255 specs.push_back(s);
256 }
257 if (updatePipelines) {
258 configcontext.options().override("pipeline", pipes);
259 }
260 }
261
262 return std::move(specs);
263}
#define verbosity
Convert CTF (EncodedBlocks) to CPV digit/channels strean.
Convert CTF (EncodedBlocks) to CTP digit stream.
Convert CTF (EncodedBlocks) to EMCAL digit/channels strean.
Convert CTF (EncodedBlocks) to FDD digit/channels strean.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to FV0 digit/channels strean.
int32_t i
Convert CTF (EncodedBlocks) to HMP digit/tracklets stream.
Convert CTF (EncodedBlocks) to clusters streams.
Convert CTF (EncodedBlocks) to MCH {Digit,ROFRecord} stream.
Convert CTF (EncodedBlocks) to MID ROFRecords/ColumnData stream.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to PHOS digit/channels strean.
Helper function to tokenize sequences and ranges of integral numbers.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to TRD digit/tracklets stream.
Convert CTF (EncodedBlocks) to ZDC BCData/ChannelData/OrbitData stream.
StringRef key
static void updateFromString(std::string const &)
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr std::string_view NONE
keywork for no-detector
Definition DetID.h:103
static constexpr ID CTP
Definition DetID.h:79
static constexpr ID FV0
Definition DetID.h:76
static constexpr std::string_view ALL
keywork for all detectors
Definition DetID.h:104
static constexpr ID PHS
Definition DetID.h:67
static constexpr ID MID
Definition DetID.h:73
static constexpr ID ITS
Definition DetID.h:63
static constexpr ID MFT
Definition DetID.h:71
static constexpr ID ZDC
Definition DetID.h:74
static constexpr ID FT0
Definition DetID.h:75
static constexpr ID CPV
Definition DetID.h:68
static constexpr ID TRD
Definition DetID.h:65
static constexpr ID TPC
Definition DetID.h:64
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
static constexpr ID EMC
Definition DetID.h:69
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
static constexpr ID FDD
Definition DetID.h:77
static constexpr ID MCH
Definition DetID.h:72
static constexpr ID HMP
Definition DetID.h:70
static constexpr ID TOF
Definition DetID.h:66
ConfigParamRegistry & options() const
void override(const char *key, ConfigValueType auto const &val) const
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLsizei const GLfloat * value
Definition glcorearb.h:819
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspecInp, unsigned int sspecOut=0)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(o2::header::DataOrigin orig, int verbosity, bool getDigits, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, const char *specName, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
std::vector< std::string > split(const std::string &str, char delimiter=',')
std::string fileRunTimeSpans
unsigned int decSSpecEMC
std::string metricChannel
o2::detectors::DetID::mask_t detMask
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)