45void customize(std::vector<ConfigParamSpec>& workflowOptions)
48 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
50 std::vector<ConfigParamSpec> options{
51 {
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
52 {
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}},
53 {
"outputFile", VariantType::String,
"./filtered-krypton-raw.root", {
"output file name for the filtered krypton file"}},
54 {
"lanes", VariantType::Int, defaultlanes, {
"Number of parallel processing lanes."}},
55 {
"sectors", VariantType::String, sectorDefault.c_str(), {
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15"}},
56 {
"writer-type", VariantType::String,
"local", {
"Writer type (local, EPN, none)"}},
57 {
"ccdb-path", VariantType::String,
"http://ccdb-test.cern.ch:8080", {
"Path to CCDB"}},
60 std::swap(workflowOptions, options);
90 const std::string outputFile = config.
options().
get<std::string>(
"outputFile");
92 const auto tpcSectors = o2::RangeTokenizer::tokenize<int>(config.
options().
get<std::string>(
"sectors"));
93 const auto nSectors = (
int)tpcSectors.size();
94 const auto nLanes = std::min(config.
options().
get<
int>(
"lanes"), nSectors);
99 }
catch (std::out_of_range&) {
100 throw std::invalid_argument(std::string(
"invalid writer-type type: ") + config.
options().
get<std::string>(
"writer-type"));
109 std::vector<int> laneConfiguration = tpcSectors;
112 for (
auto s : tpcSectors) {
121 if (cdb.isHostReachable()) {
122 noise = cdb.get<
CalPad>(
"TPC/Calib/Noise");
124 LOGP(info,
"Loaded noise from {}",
ccdbPath);
126 LOGP(error,
"Could not load noise from {}",
ccdbPath);
129 LOGP(error,
"Could not load noise from {}, host is not reachable",
ccdbPath);
136 parallelProcessors, nLanes,
137 [&laneConfiguration]() {
return laneConfiguration.size(); },
138 [&laneConfiguration](
size_t index) {
return laneConfiguration[
index]; });
139 workflow.insert(workflow.end(), parallelProcessors.begin(), parallelProcessors.end());
154 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(
ref);
155 if (!tpcSectorHeader) {
156 throw std::runtime_error(
"TPC sector header missing in header stack");
158 if (tpcSectorHeader->sector() < 0) {
163 for (
auto const& sector : tpcSectors) {
164 if (sector == tpcSectorHeader->sector()) {
169 throw std::runtime_error(
"sector " +
std::to_string(tpcSectorHeader->sector()) +
" not configured for writing");
171 auto getName = [tpcSectors](std::string base,
size_t index) {
175 auto makeWriterSpec = [tpcSectors, laneConfiguration, getIndex,
getName](
const char* processName,
176 const char* defaultFileName,
177 const char* defaultTreeName,
179 bool singleBranch =
false) {
180 if (tpcSectors.size() == 0) {
181 throw std::invalid_argument(std::string(
"writer process configuration needs list of TPC sectors"));
184 auto amendInput = [tpcSectors, laneConfiguration](
InputSpec& input,
size_t index) {
188 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex,
getName, singleBranch](
auto&& def,
bool enableMC =
true) {
190 def.keys =
mergeInputs(def.keys, laneConfiguration.size(), amendInput);
192 def.nofBranches = enableMC ? tpcSectors.size() : 0;
193 def.getIndex = getIndex;
197 def.nofBranches = enableMC ? 1 : 0;
199 return std::move(def);
203 std::move(amendBranchDef(databranch)))());
211 using OutputType = std::vector<o2::tpc::Digit>;
212 workflow.push_back(makeWriterSpec(
"tpc-raw-krypton-writer",
217 "digit-branch-name"}));
219 workflow.push_back(getFileWriterSpec<Digit>(
"data:TPC/FILTERDIG", BranchType::Digits));
ConfigParamRegistry & options() const