49#include <TStopwatch.h>
59void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
65void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
68 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
69 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
70 workflowOptions.push_back(
71 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
73 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
75 workflowOptions.push_back(
76 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
83 workflowOptions.push_back(
ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {
"Semicolon separated key=value strings ..."}});
99template <
typename T,
typename R>
106 std::copy(origin.begin(), origin.end(), std::back_inserter(
target));
111 target.mergeAtBack(origin);
119 static constexpr char value[] =
"DIGITS";
123 static constexpr char value[] =
"COMMONMODE";
127 static constexpr char value[] =
"DIGTRIGGERS";
136 header.activeSectors = activeSectors;
158 header.activeSectors = activeSectors;
160 LabelType* sharedlabels;
170void mergeHelper(
const char* brprefix, std::vector<int>
const& tpcsectors, uint64_t activeSectors,
173 auto keyslist = originfile.GetListOfKeys();
174 for (
int i = 0;
i < keyslist->GetEntries(); ++
i) {
175 auto key = keyslist->At(
i);
176 int sector = atoi(
key->GetName());
177 if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
182 auto oldtree = (TTree*)originfile.Get(
key->GetName());
184 std::stringstream digitbrname;
185 digitbrname << brprefix <<
key->GetName();
186 auto br = oldtree->GetBranch(digitbrname.str().c_str());
191 br->SetAddress(&chunk);
193 using AccumType = std::decay_t<decltype(makePublishBuffer<T>(pc, sector, activeSectors))>;
196 accum = makePublishBuffer<T>(pc, sector, activeSectors);
198 for (
auto e = 0; e < br->GetEntries(); ++e) {
205 br->DropBaskets(
"all");
214void mergeHelper<std::vector<DigiGroupRef>>(
const char* brprefix, std::vector<int>
const& tpcsectors, uint64_t activeSectors,
218 auto keyslist = originfile.GetListOfKeys();
219 for (
int i = 0;
i < keyslist->GetEntries(); ++
i) {
220 auto key = keyslist->At(
i);
221 int sector = atoi(
key->GetName());
222 if (std::find(tpcsectors.begin(), tpcsectors.end(), sector) == tpcsectors.end()) {
227 using AccumType = std::decay_t<
decltype(makePublishBuffer<std::vector<DigiGroupRef>>(pc, sector, activeSectors))>;
230 accum = makePublishBuffer<std::vector<DigiGroupRef>>(pc, sector, activeSectors);
239 uint64_t activeSectors = 0;
240 for (
auto s : tpcsectors) {
241 activeSectors |= (uint64_t)0x1 << s;
244 ROOT::EnableThreadSafety();
248 omp_set_num_threads(std::min(lanes.size(), digitfilelist.size()));
249 LOG(info) <<
"Running digit publisher with OpenMP enabled";
250#pragma omp parallel for schedule(dynamic)
252 for (
size_t fi = 0; fi < digitfilelist.size(); ++fi) {
255 auto originfile =
new TFile(
filename.c_str(),
"OPEN");
259 using DigitsType = std::vector<o2::tpc::Digit>;
261 mergeHelper<DigitsType>(
"TPCDigit_", tpcsectors, activeSectors, *originfile, pc);
263 mergeHelper<LabelType>(
"TPCDigitMCTruth_", tpcsectors, activeSectors, *originfile, pc);
267 using CommonModeType = std::vector<o2::tpc::CommonMode>;
268 mergeHelper<CommonModeType>(
"TPCCommonMode_", tpcsectors, activeSectors, *originfile, pc);
270 using TriggerType = std::vector<DigiGroupRef>;
271 mergeHelper<TriggerType>(
"TPCCommonMode_", tpcsectors, activeSectors, *originfile, pc);
281 Task(std::vector<int> laneConfig, std::vector<int> tpcsectors,
bool mctruth) : mLanes(laneConfig), mTPCSectors(tpcsectors), mDoMCTruth(mctruth)
287 LOG(info) <<
"Preparing digits (from digit chunks) for reconstruction";
296 LOG(info) <<
"DIGIT PUBLISHING TOOK " <<
w.RealTime();
305 bool mDoMCTruth =
true;
306 std::vector<int> mLanes;
307 std::vector<int> mTPCSectors;
316 using DigitsOutputType = std::vector<o2::tpc::Digit>;
317 using CommonModeOutputType = std::vector<o2::tpc::CommonMode>;
319 std::vector<OutputSpec> outputs;
322 for (
int s = 0; s < 36; ++s) {
324 outputs.emplace_back(
"TPC",
"DIGITS",
static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
326 outputs.emplace_back(
"TPC",
"DIGITSMCTR",
static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
329 outputs.emplace_back(
"TPC",
"COMMONMODE",
static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
331 outputs.emplace_back(
"TPC",
"DIGTRIGGERS",
static_cast<SubSpecificationType>(s), Lifetime::Timeframe);
336 "TPCDigitMerger", {}, outputs,
AlgorithmSpec{o2::framework::adaptFromTask<Task>(laneConfiguration, tpcsectors, mctruth)},
Options{}};
347 auto numlanes = configcontext.
options().
get<
int>(
"tpc-lanes");
348 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
349 bool writeout = configcontext.
options().
get<
bool>(
"writer-mode");
350 auto tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
352 std::vector<int> lanes(numlanes);
353 std::iota(lanes.begin(), lanes.end(), 0);
359 if (tpcsectors.size() != 36) {
360 LOG(error) <<
"You currently need to include all TPC sectors in the ROOT writer-mode";
362 std::vector<int> writerlanes(tpcsectors.size());
363 std::iota(writerlanes.begin(), writerlanes.end(), 0);
Definition of the Detector class.
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
o2::dataformats::MCTruthContainer< o2::MCCompLabel > MCTruthContainer
Definition of the common mode container class.
A const (ready only) version of MCTruthContainer.
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
Definition of a container to keep Monte Carlo truth external to simulation objects.
Class to refer to the 1st entry and N elements of some group in the continuous container.
Helper function to tokenize sequences and ranges of integral numbers.
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
T get(const char *key) const
o2::header::DataHeader::SubSpecificationType SubSpecificationType
decltype(auto) make(const Output &spec, Args... args)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
static constexpr int MAXSECTOR
void init(framework::InitContext &ctx)
Task(std::vector< int > laneConfig, std::vector< int > tpcsectors, bool mctruth)
void run(framework::ProcessingContext &pc)
GLsizei const GLfloat * value
GLubyte GLubyte GLubyte GLubyte w
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< ConfigParamSpec > Options
header::DataHeader::SubSpecificationType SubSpecificationType
DataProcessorSpec getSpec(std::vector< int > const &laneConfiguration, std::vector< int > const &tpcsectors, bool mctruth, bool publish=true)
void mergeHelper(const char *brprefix, std::vector< int > const &tpcsectors, uint64_t activeSectors, TFile &originfile, framework::ProcessingContext &pc)
auto makePublishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void publishBuffer(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, T *accum)
o2::framework::DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
void publishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors, MCTruthContainer *accum)
void publishMergedTimeframes(std::vector< int > const &lanes, std::vector< int > const &tpcsectors, bool domctruth, framework::ProcessingContext &pc)
auto makePublishBuffer< MCTruthContainer >(framework::ProcessingContext &pc, int sector, uint64_t activeSectors)
void copyHelper< MCTruthContainer >(MCTruthContainer const &origin, MCTruthContainer &target)
void copyHelper(T const &origin, R &target)
std::vector< std::string > listFiles(std::string const &dir, std::string const &searchpattern)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"