43void customize(std::vector<ConfigParamSpec>& workflowOptions)
46 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
48 std::vector<ConfigParamSpec> options{
49 {
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}},
50 {
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}},
51 {
"outputFile", VariantType::String,
"./tpcBoxClusters.root", {
"output file name for the box cluster root file"}},
52 {
"lanes", VariantType::Int, defaultlanes, {
"Number of parallel processing lanes."}},
53 {
"sectors", VariantType::String, sectorDefault.c_str(), {
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15"}},
54 {
"writer-type", VariantType::String,
"local", {
"Writer type (local, EPN, none)"}},
57 std::swap(workflowOptions, options);
87 const std::string outputFile = config.
options().
get<std::string>(
"outputFile");
89 const auto tpcSectors = o2::RangeTokenizer::tokenize<int>(config.
options().
get<std::string>(
"sectors"));
90 const auto nSectors = (
int)tpcSectors.size();
91 const auto nLanes = std::min(config.
options().
get<
int>(
"lanes"), nSectors);
96 }
catch (std::out_of_range&) {
97 throw std::invalid_argument(std::string(
"invalid writer-type type: ") + config.
options().
get<std::string>(
"writer-type"));
106 std::vector<int> laneConfiguration = tpcSectors;
109 for (
auto s : tpcSectors) {
118 parallelProcessors, nLanes,
119 [&laneConfiguration]() {
return laneConfiguration.size(); },
120 [&laneConfiguration](
size_t index) {
return laneConfiguration[
index]; });
121 workflow.insert(workflow.end(), parallelProcessors.begin(), parallelProcessors.end());
136 auto const* tpcSectorHeader = o2::framework::DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(
ref);
137 if (!tpcSectorHeader) {
138 throw std::runtime_error(
"TPC sector header missing in header stack");
140 if (tpcSectorHeader->sector() < 0) {
145 for (
auto const& sector : tpcSectors) {
146 if (sector == tpcSectorHeader->sector()) {
151 throw std::runtime_error(
"sector " +
std::to_string(tpcSectorHeader->sector()) +
" not configured for writing");
153 auto getName = [tpcSectors](std::string base,
size_t index) {
157 auto makeWriterSpec = [tpcSectors, laneConfiguration, getIndex,
getName](
const char* processName,
158 const char* defaultFileName,
159 const char* defaultTreeName,
161 bool singleBranch =
false) {
162 if (tpcSectors.size() == 0) {
163 throw std::invalid_argument(std::string(
"writer process configuration needs list of TPC sectors"));
166 auto amendInput = [tpcSectors, laneConfiguration](
InputSpec& input,
size_t index) {
170 auto amendBranchDef = [laneConfiguration, amendInput, tpcSectors, getIndex,
getName, singleBranch](
auto&& def,
bool enableMC =
true) {
172 def.keys =
mergeInputs(def.keys, laneConfiguration.size(), amendInput);
174 def.nofBranches = enableMC ? tpcSectors.size() : 0;
175 def.getIndex = getIndex;
179 def.nofBranches = enableMC ? 1 : 0;
181 return std::move(def);
185 std::move(amendBranchDef(databranch)))());
193 using KrClusterOutputType = std::vector<o2::tpc::KrCluster>;
194 workflow.push_back(makeWriterSpec(
"tpc-krcluster-writer",
199 "boxcluster-branch-name"}));
201 workflow.push_back(getFileWriterSpec<KrCluster>(
"data:TPC/KRCLUSTERS", BranchType::Krypton));
ConfigParamRegistry & options() const