50void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
53 std::vector<o2::framework::ConfigParamSpec> options;
54 options.push_back(
ConfigParamSpec{
"ctf-input", VariantType::String,
"none", {
"comma-separated list CTF input files"}});
55 options.push_back(
ConfigParamSpec{
"onlyDet", VariantType::String, std::string{
DetID::ALL}, {
"comma-separated list of detectors to accept. Overrides skipDet"}});
56 options.push_back(
ConfigParamSpec{
"skipDet", VariantType::String, std::string{
DetID::NONE}, {
"comma-separate list of detectors to skip"}});
57 options.push_back(
ConfigParamSpec{
"loop", VariantType::Int, 0, {
"loop N times (infinite for N<0)"}});
58 options.push_back(
ConfigParamSpec{
"delay", VariantType::Float, 0.f, {
"delay in seconds between consecutive TFs sending"}});
59 options.push_back(
ConfigParamSpec{
"copy-cmd", VariantType::String,
"alien_cp ?src file://?dst", {
"copy command for remote files or no-copy to avoid copying"}});
60 options.push_back(
ConfigParamSpec{
"ctf-file-regex", VariantType::String,
".*o2_ctf_run.+\\.root$", {
"regex string to identify CTF files"}});
61 options.push_back(
ConfigParamSpec{
"remote-regex", VariantType::String,
"^(alien://|)/alice/data/.+", {
"regex string to identify remote files"}});
62 options.push_back(
ConfigParamSpec{
"max-cached-files", VariantType::Int, 3, {
"max CTF files queued (copied for remote source)"}});
63 options.push_back(
ConfigParamSpec{
"allow-missing-detectors", VariantType::Bool,
false, {
"send empty message if detector is missing in the CTF (otherwise throw)"}});
64 options.push_back(
ConfigParamSpec{
"send-diststf-0xccdb", VariantType::Bool,
false, {
"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
65 options.push_back(
ConfigParamSpec{
"ctf-reader-verbosity", VariantType::Int, 0, {
"verbosity level (0: summary per detector, 1: summary per block"}});
66 options.push_back(
ConfigParamSpec{
"ctf-data-subspec", VariantType::Int, 0, {
"subspec to use for decoded CTF messages (use non-0 if CTF writer will be attached downstream)"}});
67 options.push_back(
ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings"}});
68 options.push_back(
ConfigParamSpec{
"ir-frames-files", VariantType::String,
"", {
"If non empty, inject selected IRFrames from this file"}});
69 options.push_back(
ConfigParamSpec{
"run-time-span-file", VariantType::String,
"", {
"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
70 options.push_back(
ConfigParamSpec{
"skip-skimmed-out-tf", VariantType::Bool,
false, {
"Do not process TFs with empty IR-Frame coverage"}});
71 options.push_back(
ConfigParamSpec{
"invert-irframe-selection", VariantType::Bool,
false, {
"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
73 options.push_back(
ConfigParamSpec{
"its-digits", VariantType::Bool,
false, {
"convert ITS clusters to digits"}});
74 options.push_back(
ConfigParamSpec{
"mft-digits", VariantType::Bool,
false, {
"convert MFT clusters to digits"}});
76 options.push_back(
ConfigParamSpec{
"emcal-decoded-subspec", VariantType::Int, 0, {
"subspec to use for decoded EMCAL data"}});
78 options.push_back(
ConfigParamSpec{
"timeframes-shm-limit", VariantType::String,
"0", {
"Minimum amount of SHM required in order to publish data"}});
79 options.push_back(
ConfigParamSpec{
"metric-feedback-channel-format", VariantType::String,
"name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {
"format for the metric-feedback channel for TF rate limiting"}});
80 options.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combine multiple DPL devices (entropy decoders)"}});
81 std::swap(workflowOptions, options);
93 std::string allowedDetectors =
"ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP";
107 throw std::runtime_error(
"--ctf-input <file,...> is not provided");
128 ctfInput.
minSHM = std::stoul(configcontext.
options().
get<std::string>(
"timeframes-shm-limit"));
135 int rateLimitingIPCID = std::stoi(configcontext.
options().
get<std::string>(
"timeframes-rate-limit-ipcid"));
136 std::string chanFmt = configcontext.
options().
get<std::string>(
"metric-feedback-channel-format");
137 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
144 LOGP(fatal,
"One cannot provide --ir-frames-files and --run-time-span-file options simultaneously");
149 auto pipes = configcontext.
options().
get<std::string>(
"pipeline");
150 std::unordered_map<std::string, int> plines;
152 for (
auto& token : ptokens) {
153 auto split = token.find(
":");
154 if (
split == std::string::npos) {
155 throw std::runtime_error(
"bad pipeline definition. Syntax <processor>:<pipeline>");
158 token.erase(0,
split + 1);
160 auto value = std::stoll(token, &error, 10);
161 if (token[error] !=
'\0') {
162 throw std::runtime_error(
"Bad pipeline definition. Expecting integer");
169 std::vector<WorkflowSpec> decSpecsV;
172 auto entry = plines.find(s.name);
173 size_t mult = (
entry == plines.end() ||
entry->second < 2) ? 1 :
entry->second;
174 if (mult > decSpecsV.size()) {
175 decSpecsV.resize(mult);
177 decSpecsV[mult - 1].push_back(s);
230 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
232 for (
auto& decSpecs : decSpecsV) {
233 for (
auto& s : decSpecs) {
238 std::vector<DataProcessorSpec> remaining;
239 if (decSpecsV.size() && decSpecsV[0].size()) {
240 specs.push_back(
specCombiner(
"EntropyDecoders", decSpecsV[0], remaining));
242 bool updatePipelines =
false;
243 for (
size_t i = 1;
i < decSpecsV.size();
i++) {
244 if (decSpecsV[
i].
size() > 1) {
245 specs.push_back(
specCombiner(fmt::format(
"EntropyDecodersP{}",
i + 1), decSpecsV[
i], remaining));
246 updatePipelines =
true;
247 pipes += fmt::format(
",EntropyDecodersP{}:{}",
i + 1,
i + 1);
249 for (
auto& s : decSpecsV[
i]) {
254 for (
auto& s : remaining) {
257 if (updatePipelines) {
262 return std::move(specs);
Convert CTF (EncodedBlocks) to CPV digit/channels strean.
Convert CTF (EncodedBlocks) to CTP digit stream.
Convert CTF (EncodedBlocks) to EMCAL digit/channels strean.
Convert CTF (EncodedBlocks) to FDD digit/channels strean.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to FV0 digit/channels strean.
Convert CTF (EncodedBlocks) to HMP digit/tracklets stream.
Convert CTF (EncodedBlocks) to clusters streams.
Convert CTF (EncodedBlocks) to MCH {Digit,ROFRecord} stream.
Convert CTF (EncodedBlocks) to MID ROFRecords/ColumnData stream.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to PHOS digit/channels strean.
Helper function to tokenize sequences and ranges of integral numbers.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to TRD digit/tracklets stream.
Convert CTF (EncodedBlocks) to ZDC BCData/ChannelData/OrbitData stream.
static void updateFromString(std::string const &)
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr std::string_view NONE
keywork for no-detector
static constexpr std::string_view ALL
keywork for all detectors
static constexpr o2h::DataOrigin getDataOrigin(ID id)
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
void override(const char *key, ConfigValueType auto const &val) const
T get(const char *key) const
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLsizei const GLfloat * value
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspecInp, unsigned int sspecOut=0)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(o2::header::DataOrigin orig, int verbosity, bool getDigits, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, const char *specName, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
std::vector< std::string > split(const std::string &str, char delimiter=',')
std::string fileRunTimeSpans
std::string metricChannel
o2::detectors::DetID::mask_t detMask
bool allowMissingDetectors
bool invertIRFramesSelection
static std::string defaultIPCFolder()
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)