58void customize(std::vector<ConfigParamSpec>& workflowOptions)
61 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
63 std::vector<ConfigParamSpec> options{
64 {
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
65 {
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}},
66 {
"outputFile", VariantType::String,
"./occupancy-filtered-digits.root", {
"output file name for the filtered krypton file"}},
67 {
"lanes", VariantType::Int, defaultlanes, {
"Number of parallel processing lanes."}},
68 {
"sectors", VariantType::String, sectorDefault.c_str(), {
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15"}},
69 {
"writer-type", VariantType::String,
"local", {
"Writer type (local, EPN, none)"}},
70 {
"ccdb-path", VariantType::String,
"http://ccdb-test.cern.ch:8080", {
"Path to CCDB"}},
73 std::swap(workflowOptions, options);
103 const std::string outputFile = config.
options().
get<std::string>(
"outputFile");
105 const auto tpcSectors = o2::RangeTokenizer::tokenize<int>(config.
options().
get<std::string>(
"sectors"));
106 const auto nSectors = (
int)tpcSectors.size();
107 const auto nLanes = std::min(config.
options().
get<
int>(
"lanes"), nSectors);
112 }
catch (std::out_of_range&) {
113 throw std::invalid_argument(std::string(
"invalid writer-type type: ") + config.
options().
get<std::string>(
"writer-type"));
122 std::vector<int> laneConfiguration = tpcSectors;
125 for (
auto s : tpcSectors) {
134 parallelProcessors, nLanes,
135 [&laneConfiguration]() {
return laneConfiguration.size(); },
136 [&laneConfiguration](
size_t index) {
return laneConfiguration[
index]; });
137 workflow.insert(workflow.end(), parallelProcessors.begin(), parallelProcessors.end());
152 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(
ref);
153 if (!tpcSectorHeader) {
154 throw std::runtime_error(
"TPC sector header missing in header stack");
156 if (tpcSectorHeader->sector() < 0) {
161 for (
auto const& sector : tpcSectors) {
162 if (sector == tpcSectorHeader->sector()) {
167 throw std::runtime_error(
"sector " +
std::to_string(tpcSectorHeader->sector()) +
" not configured for writing");
169 auto getName = [tpcSectors](std::string base,
size_t index) {
173 auto makeWriterSpec = [tpcSectors, laneConfiguration, getIndex,
getName](
const char* processName,
174 const char* defaultFileName,
175 const char* defaultTreeName,
177 bool singleBranch =
false) {
178 if (tpcSectors.size() == 0) {
179 throw std::invalid_argument(std::string(
"writer process configuration needs list of TPC sectors"));
182 auto amendInput = [tpcSectors, laneConfiguration](
InputSpec& input,
size_t index) {
186 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex,
getName, singleBranch](
auto&& def,
bool enableMC =
true) {
188 def.keys =
mergeInputs(def.keys, laneConfiguration.size(), amendInput);
190 def.nofBranches = enableMC ? tpcSectors.size() : 0;
191 def.getIndex = getIndex;
195 def.nofBranches = enableMC ? 1 : 0;
197 return std::move(def);
201 std::move(amendBranchDef(databranch)))());
209 using OutputType = std::vector<o2::tpc::Digit>;
210 workflow.push_back(makeWriterSpec(
"tpc-occupancy-writer",
215 "digit-branch-name"}));
217 workflow.push_back(getFileWriterSpec<Digit>(
"data:TPC/FILTERDIG", BranchType::Digits));
ConfigParamRegistry & options() const