Project
Loading...
Searching...
No Matches
tf-reader-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15#include "Framework/Logger.h"
16#include <string>
17#include <bitset>
18#include "TFReaderSpec.h"
19
20using namespace o2::framework;
21
22// we need to add workflow options before including Framework/runDataProcessing
23void customize(std::vector<ConfigParamSpec>& workflowOptions)
24{
25 // option allowing to set parameters
26 std::vector<ConfigParamSpec> options;
27 options.push_back(ConfigParamSpec{"input-data", VariantType::String, "", {"input data (obligatory)"}});
28 options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, "all", {"list of dectors"}});
29 options.push_back(ConfigParamSpec{"raw-only-det", VariantType::String, "none", {"do not open non-raw channel for these detectors"}});
30 options.push_back(ConfigParamSpec{"non-raw-only-det", VariantType::String, "none", {"do not open raw channel for these detectors"}});
31 options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (-1 = infinite)"}});
32 options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
33 options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
34 options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}});
35 options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
36 options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}});
37 options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});
38 options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
39 options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
40 options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});
41 options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
42 options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
43
44 // options for error-check suppression
45
46 std::swap(workflowOptions, options);
47}
48
49// ------------------------------------------------------------------
50
52
54{
56 rinp.inpdata = configcontext.options().get<std::string>("input-data");
57 rinp.maxLoops = configcontext.options().get<int>("loop");
58 auto detlistSelect = configcontext.options().get<std::string>("onlyDet");
59 if (detlistSelect == "all") {
60 // Exclude FOCAL from default detlist (must be selected on request)
61 auto msk = o2::detectors::DetID::getMask("all") & ~o2::detectors::DetID::getMask("FOC");
63 } else {
64 rinp.detList = detlistSelect;
65 }
66 rinp.detListRawOnly = configcontext.options().get<std::string>("raw-only-det");
67 rinp.detListNonRawOnly = configcontext.options().get<std::string>("non-raw-only-det");
68 rinp.rawChannelConfig = configcontext.options().get<std::string>("raw-channel-config");
69 rinp.delay_us = uint64_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
70 rinp.verbosity = configcontext.options().get<int>("tf-reader-verbosity");
71 rinp.copyCmd = configcontext.options().get<std::string>("copy-cmd");
72 rinp.tffileRegex = configcontext.options().get<std::string>("tf-file-regex");
73 rinp.remoteRegex = configcontext.options().get<std::string>("remote-regex");
74 rinp.sendDummyForMissing = !configcontext.options().get<bool>("disable-dummy-output");
75 rinp.sup0xccdb = !configcontext.options().get<bool>("send-diststf-0xccdb");
76 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
77 rinp.minSHM = std::stoul(configcontext.options().get<std::string>("timeframes-shm-limit"));
78 int rateLimitingIPCID = std::stoi(configcontext.options().get<std::string>("timeframes-rate-limit-ipcid"));
79 std::string chanFmt = configcontext.options().get<std::string>("metric-feedback-channel-format");
80 if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
81 rinp.metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
82 }
83
84 WorkflowSpec specs;
85 specs.emplace_back(o2::rawdd::getTFReaderSpec(rinp));
86 return specs;
87}
static void updateFromString(std::string const &)
static std::string getNames(mask_t mask, char delimiter=',')
Definition DetID.cxx:74
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
ConfigParamRegistry & options() const
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getTFReaderSpec(o2::rawdd::TFReaderInp &rinp)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string metricChannel
std::string detListNonRawOnly
std::string rawChannelConfig
std::string detListRawOnly
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
void customize(std::vector< ConfigParamSpec > &workflowOptions)