48#include <TStopwatch.h>
55void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
61void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
64 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
65 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
66 workflowOptions.push_back(
67 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
69 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
71 workflowOptions.push_back(
72 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
76 workflowOptions.push_back(
ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings ..."}});
92template <
typename T,
typename R>
99 std::copy(origin.begin(), origin.end(), std::back_inserter(
target));
104 target.mergeAtBack(origin);
110 LOG(info) <<
"PUBLISHING SECTOR " << sector;
113 header.activeSectors = activeSectors;
135 header.activeSectors = activeSectors;
137 LabelType* sharedlabels;
147void mergeHelper(
const char* brprefix, std::vector<int>
const& tpcsectors, uint64_t activeSectors,
150 auto keyslist = originfile.GetListOfKeys();
151 for (
int i = 0;
i < keyslist->GetEntries(); ++
i) {
152 auto key = keyslist->At(
i);
153 int sector = atoi(
key->GetName());
154 if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
159 auto oldtree = (TTree*)originfile.Get(
key->GetName());
161 std::stringstream digitbrname;
162 digitbrname << brprefix <<
key->GetName();
163 auto br = oldtree->GetBranch(digitbrname.str().c_str());
168 br->SetAddress(&chunk);
170 using AccumType = std::decay_t<decltype(makePublishBuffer<T>(pc, sector, activeSectors))>;
173 accum = makePublishBuffer<T>(pc, sector, activeSectors);
175 for (
auto e = 0; e < br->GetEntries(); ++e) {
182 br->DropBaskets(
"all");
192 uint64_t activeSectors = 0;
193 for (
auto s : tpcsectors) {
194 activeSectors |= (uint64_t)0x1 << s;
197 ROOT::EnableThreadSafety();
201 omp_set_num_threads(std::min(lanes.size(), digitfilelist.size()));
202 LOG(info) <<
"Running digit publisher with OpenMP enabled";
203#pragma omp parallel for schedule(dynamic)
205 for (
size_t fi = 0; fi < digitfilelist.size(); ++fi) {
208 auto originfile =
new TFile(
filename.c_str(),
"OPEN");
212 using DigitsType = std::vector<o2::tpc::Digit>;
214 mergeHelper<DigitsType>(
"TPCDigit_", tpcsectors, activeSectors, *originfile, pc);
216 mergeHelper<LabelType>(
"TPCDigitMCTruth_", tpcsectors, activeSectors, *originfile, pc);
226 Task(std::vector<int> laneConfig, std::vector<int> tpcsectors,
bool mctruth) : mLanes(laneConfig), mTPCSectors(tpcsectors), mDoMCTruth(mctruth)
232 LOG(info) <<
"Preparing digits (from digit chunks) for reconstruction";
241 LOG(info) <<
"DIGIT PUBLISHING TOOK " <<
w.RealTime();
250 bool mDoMCTruth =
true;
251 std::vector<int> mLanes;
252 std::vector<int> mTPCSectors;
261 using DigitsOutputType = std::vector<o2::tpc::Digit>;
262 using CommonModeOutputType = std::vector<o2::tpc::CommonMode>;
264 std::vector<OutputSpec> outputs;
267 for (
int s = 0; s < 36; ++s) {
269 outputs.emplace_back(
"TPC",
"DIGITS",
static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
271 outputs.emplace_back(
"TPC",
"DIGITSMCTR",
static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
277 "TPCDigitMerger", {}, outputs,
AlgorithmSpec{o2::framework::adaptFromTask<Task>(laneConfiguration, tpcsectors, mctruth)},
Options{}};
288 auto numlanes = configcontext.
options().
get<
int>(
"tpc-lanes");
289 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
290 auto tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
292 std::vector<int> lanes(numlanes);
293 std::iota(lanes.begin(), lanes.end(), 0);
Definition of the Detector class.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
o2::dataformats::MCTruthContainer< o2::MCCompLabel > MCTruthContainer
Definition of the common mode container class.
A const (ready only) version of MCTruthContainer.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
Definition of a container to keep Monte Carlo truth external to simulation objects.
Helper function to tokenize sequences and ranges of integral numbers.
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
T get(const char *key) const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr int MAXSECTOR
void init(framework::InitContext &ctx)
Task(std::vector< int > laneConfig, std::vector< int > tpcsectors, bool mctruth)
void run(framework::ProcessingContext &pc)
GLubyte GLubyte GLubyte GLubyte w
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< ConfigParamSpec > Options
header::DataHeader::SubSpecificationType SubSpecificationType
DataProcessorSpec getSpec(std::vector< int > const &laneConfiguration, std::vector< int > const &tpcsectors, bool mctruth, bool publish=true)
void mergeHelper(const char *brprefix, std::vector< int > const &tpcsectors, uint64_t activeSectors, TFile &originfile, framework::ProcessingContext &pc)
auto makePublishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void publishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, T *accum)
void publishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, MCTruthContainer *accum)
void publishMergedTimeframes(std::vector< int > const &lanes, std::vector< int > const &tpcsectors, bool domctruth, framework::ProcessingContext &pc)
auto makePublishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void copyHelper< MCTruthContainer >(MCTruthContainer const &origin, MCTruthContainer &target)
void copyHelper(T const &origin, R &target)
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"