50void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
53 std::vector<o2::framework::ConfigParamSpec> options;
54 options.push_back(
ConfigParamSpec{
"ctf-input", VariantType::String,
"none", {
"comma-separated list CTF input files"}});
55 options.push_back(
ConfigParamSpec{
"onlyDet", VariantType::String, std::string{
DetID::ALL}, {
"comma-separated list of detectors to accept. Overrides skipDet"}});
56 options.push_back(
ConfigParamSpec{
"skipDet", VariantType::String, std::string{
DetID::NONE}, {
"comma-separate list of detectors to skip"}});
57 options.push_back(
ConfigParamSpec{
"loop", VariantType::Int, 0, {
"loop N times (infinite for N<0)"}});
58 options.push_back(
ConfigParamSpec{
"delay", VariantType::Float, 0.f, {
"delay in seconds between consecutive TFs sending"}});
59 options.push_back(
ConfigParamSpec{
"shuffle", VariantType::Bool,
false, {
"shuffle TF sending order (for debug)"}});
60 options.push_back(
ConfigParamSpec{
"copy-cmd", VariantType::String,
"alien_cp ?src file://?dst", {
"copy command for remote files or no-copy to avoid copying"}});
61 options.push_back(
ConfigParamSpec{
"ctf-file-regex", VariantType::String,
".*o2_ctf_run.+\\.root$", {
"regex string to identify CTF files"}});
62 options.push_back(
ConfigParamSpec{
"remote-regex", VariantType::String,
"^(alien://|)/alice/data/.+", {
"regex string to identify remote files"}});
63 options.push_back(
ConfigParamSpec{
"max-cached-files", VariantType::Int, 3, {
"max CTF files queued (copied for remote source)"}});
64 options.push_back(
ConfigParamSpec{
"allow-missing-detectors", VariantType::Bool,
false, {
"send empty message if detector is missing in the CTF (otherwise throw)"}});
65 options.push_back(
ConfigParamSpec{
"send-diststf-0xccdb", VariantType::Bool,
false, {
"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
66 options.push_back(
ConfigParamSpec{
"ctf-reader-verbosity", VariantType::Int, 0, {
"verbosity level (0: summary per detector, 1: summary per block"}});
67 options.push_back(
ConfigParamSpec{
"ctf-data-subspec", VariantType::Int, 0, {
"subspec to use for decoded CTF messages (use non-0 if CTF writer will be attached downstream)"}});
68 options.push_back(
ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings"}});
69 options.push_back(
ConfigParamSpec{
"ir-frames-files", VariantType::String,
"", {
"If non empty, inject selected IRFrames from this file"}});
70 options.push_back(
ConfigParamSpec{
"run-time-span-file", VariantType::String,
"", {
"If non empty, inject selected IRFrames from this text file (run, min/max orbit or unix time)"}});
71 options.push_back(
ConfigParamSpec{
"skip-skimmed-out-tf", VariantType::Bool,
false, {
"Do not process TFs with empty IR-Frame coverage"}});
72 options.push_back(
ConfigParamSpec{
"invert-irframe-selection", VariantType::Bool,
false, {
"Select only frames mentioned in ir-frames-file (skip-skimmed-out-tf applied to TF not selected!)"}});
74 options.push_back(
ConfigParamSpec{
"its-digits", VariantType::Bool,
false, {
"convert ITS clusters to digits"}});
75 options.push_back(
ConfigParamSpec{
"mft-digits", VariantType::Bool,
false, {
"convert MFT clusters to digits"}});
77 options.push_back(
ConfigParamSpec{
"emcal-decoded-subspec", VariantType::Int, 0, {
"subspec to use for decoded EMCAL data"}});
79 options.push_back(
ConfigParamSpec{
"timeframes-shm-limit", VariantType::String,
"0", {
"Minimum amount of SHM required in order to publish data"}});
80 options.push_back(
ConfigParamSpec{
"metric-feedback-channel-format", VariantType::String,
"name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {
"format for the metric-feedback channel for TF rate limiting"}});
81 options.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combine multiple DPL devices (entropy decoders)"}});
82 std::swap(workflowOptions, options);
94 std::string allowedDetectors =
"ITS,TPC,TRD,TOF,PHS,CPV,EMC,HMP,MFT,MCH,MID,ZDC,FT0,FV0,FDD,CTP";
108 throw std::runtime_error(
"--ctf-input <file,...> is not provided");
130 ctfInput.
minSHM = std::stoul(configcontext.
options().
get<std::string>(
"timeframes-shm-limit"));
137 int rateLimitingIPCID = std::stoi(configcontext.
options().
get<std::string>(
"timeframes-rate-limit-ipcid"));
138 std::string chanFmt = configcontext.
options().
get<std::string>(
"metric-feedback-channel-format");
139 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
146 LOGP(fatal,
"One cannot provide --ir-frames-files and --run-time-span-file options simultaneously");
151 auto pipes = configcontext.
options().
get<std::string>(
"pipeline");
152 std::unordered_map<std::string, int> plines;
154 for (
auto& token : ptokens) {
155 auto split = token.find(
":");
156 if (
split == std::string::npos) {
157 throw std::runtime_error(
"bad pipeline definition. Syntax <processor>:<pipeline>");
160 token.erase(0,
split + 1);
162 auto value = std::stoll(token, &error, 10);
163 if (token[error] !=
'\0') {
164 throw std::runtime_error(
"Bad pipeline definition. Expecting integer");
171 std::vector<WorkflowSpec> decSpecsV;
174 auto entry = plines.find(s.name);
175 size_t mult = (
entry == plines.end() ||
entry->second < 2) ? 1 :
entry->second;
176 if (mult > decSpecsV.size()) {
177 decSpecsV.resize(mult);
179 decSpecsV[mult - 1].push_back(s);
232 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
234 for (
auto& decSpecs : decSpecsV) {
235 for (
auto& s : decSpecs) {
240 std::vector<DataProcessorSpec> remaining;
241 if (decSpecsV.size() && decSpecsV[0].size()) {
242 specs.push_back(
specCombiner(
"EntropyDecoders", decSpecsV[0], remaining));
244 bool updatePipelines =
false;
245 for (
size_t i = 1;
i < decSpecsV.size();
i++) {
246 if (decSpecsV[
i].
size() > 1) {
247 specs.push_back(
specCombiner(fmt::format(
"EntropyDecodersP{}",
i + 1), decSpecsV[
i], remaining));
248 updatePipelines =
true;
249 pipes += fmt::format(
",EntropyDecodersP{}:{}",
i + 1,
i + 1);
251 for (
auto& s : decSpecsV[
i]) {
256 for (
auto& s : remaining) {
259 if (updatePipelines) {
264 return std::move(specs);
Convert CTF (EncodedBlocks) to CPV digit/channels strean.
Convert CTF (EncodedBlocks) to CTP digit stream.
Convert CTF (EncodedBlocks) to EMCAL digit/channels strean.
Convert CTF (EncodedBlocks) to FDD digit/channels strean.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to FV0 digit/channels strean.
Convert CTF (EncodedBlocks) to HMP digit/tracklets stream.
Convert CTF (EncodedBlocks) to clusters streams.
Convert CTF (EncodedBlocks) to MCH {Digit,ROFRecord} stream.
Convert CTF (EncodedBlocks) to MID ROFRecords/ColumnData stream.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to PHOS digit/channels strean.
Helper function to tokenize sequences and ranges of integral numbers.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert CTF (EncodedBlocks) to TRD digit/tracklets stream.
Convert CTF (EncodedBlocks) to ZDC BCData/ChannelData/OrbitData stream.
static void updateFromString(std::string const &)
Static class with identifiers, bitmasks and names for ALICE detectors.
static constexpr std::string_view NONE
keywork for no-detector
static constexpr std::string_view ALL
keywork for all detectors
static constexpr o2h::DataOrigin getDataOrigin(ID id)
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
void override(const char *key, ConfigValueType auto const &val) const
T get(const char *key) const
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
GLsizei const GLfloat * value
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getCTFReaderSpec(const o2::ctf::CTFReaderInp &inp)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspecInp, unsigned int sspecOut=0)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(o2::header::DataOrigin orig, int verbosity, bool getDigits, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, const char *specName, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
framework::DataProcessorSpec getEntropyDecoderSpec(int verbosity, unsigned int sspec)
create a processor spec
std::vector< std::string > split(const std::string &str, char delimiter=',')
std::string fileRunTimeSpans
std::string metricChannel
o2::detectors::DetID::mask_t detMask
bool allowMissingDetectors
bool invertIRFramesSelection
static std::string defaultIPCFolder()
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)