Project
Loading...
Searching...
No Matches
tpc-reco-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/PartRef.h"
34
35#include <string>
36#include <stdexcept>
37#include <unordered_map>
38#include <regex>
39#include <cstdint>
40
41// we need a global variable to propagate the type the message dispatching of the
42// publisher will trigger on. This is dependent on the input type
43static o2::framework::ConcreteDataTypeMatcher gDispatchTrigger{"", ""};
44
45// Global variable used to transport data to the completion policy
47static uint64_t gTpcSectorMask = 0xFFFFFFFFF;
48
49void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
50{
52}
53
54// add workflow options, note that customization needs to be declared before
55// including Framework/runDataProcessing
56void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
57{
58 using namespace o2::framework;
59
60 std::vector<ConfigParamSpec> options{
61 {"input-type", VariantType::String, "digits", {"digitizer, digits, zsraw, clustershw, clusters, compressed-clusters-root, compressed-clusters-flat, compressed-clusters-flat-for-encode, pass-through"}},
62 {"output-type", VariantType::String, "tracks", {"digits, zsraw, clustershw, clusters, tracks, compressed-clusters-root, compressed-clusters-flat, encoded-clusters, disable-writer, send-clusters-per-sector, qa, no-shared-cluster-map, tpc-triggers"}},
63 {"disable-root-input", o2::framework::VariantType::Bool, false, {"disable root-files input reader"}},
64 {"no-ca-clusterer", VariantType::Bool, false, {"Use HardwareClusterer instead of clusterer of GPUCATracking"}},
65 {"disable-mc", VariantType::Bool, false, {"disable sending of MC information"}},
66 {"tpc-sectors", VariantType::String, "0-35", {"TPC sector range, e.g. 5-7,8,9"}},
67 {"tpc-lanes", VariantType::Int, 1, {"number of parallel lanes up to the tracker"}},
68 {"dispatching-mode", VariantType::String, "prompt", {"determines when to dispatch: prompt, complete"}},
69 {"no-tpc-zs-on-the-fly", VariantType::Bool, false, {"Do not use TPC zero suppression on the fly"}},
70 {"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}},
71 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCHwClusterer.peakChargeThreshold=4;...')"}},
72 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
73 {"filtered-input", VariantType::Bool, false, {"Filtered tracks, clusters input, prefix dataDescriptors with F"}},
74 {"select-ir-frames", VariantType::Bool, false, {"Subscribe and filter according to external IR Frames"}},
75 {"ctf-dict", VariantType::String, "none", {"CTF dictionary: empty or ccdb=CCDB, none=no external dictionary otherwise: local filename"}},
76 {"tpc-deadMap-sources", VariantType::Int, -1, {"Sources to consider for TPC dead channel map creation; -1=all, 0=deactivated"}},
77 {"tpc-mc-time-gain", VariantType::Bool, false, {"use time gain calibration for MC (true) or for data (false)"}},
78 };
81 std::swap(workflowOptions, options);
82}
83
84// customize dispatch policy, dispatch immediately what is ready
85void customize(std::vector<o2::framework::DispatchPolicy>& policies)
86{
88 // we customize all devices to dispatch data immediately
89 auto readerMatcher = [](auto const& spec) {
90 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex(".*-reader"));
91 };
92 auto triggerMatcher = [](auto const& query) {
93 // a bit of a hack but we want this to be configurable from the command line,
94 // however DispatchPolicy is inserted before all other setup. Triggering depending
95 // on the global variable set from the command line option. If scheduled messages
96 // are not triggered they are sent out at the end of the computation
97 return gDispatchTrigger.origin == query.origin && gDispatchTrigger.description == query.description;
98 };
99 policies.push_back({"prompt-for-reader", readerMatcher, DispatchOp::WhenReady, triggerMatcher});
100}
101
102// customize clusterers and cluster decoders to process immediately what comes in
103void customize(std::vector<o2::framework::CompletionPolicy>& policies)
104{
105 // we customize the pipeline processors to consume data as it comes
108 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-cluster-decoder.*", CompletionPolicy::CompletionOp::Consume));
109 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-clusterer.*", CompletionPolicy::CompletionOp::Consume));
110 // ordered policies for the writers
111 policies.push_back(CompletionPolicyHelpers::consumeWhenAllOrdered(".*(?:TPC|tpc).*[w,W]riter.*"));
112 // the custom completion policy for the tracker
114}
115
117{
118 hook = [](const char* idstring) {
120 };
121}
122
123#include "Framework/runDataProcessing.h" // the main driver
124
125using namespace o2::framework;
126
142{
143 std::vector<int> tpcSectors = o2::RangeTokenizer::tokenize<int>(cfgc.options().get<std::string>("tpc-sectors"));
144 // the lane configuration defines the subspecification ids to be distributed among the lanes.
145 std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
146 auto nLanes = cfgc.options().get<int>("tpc-lanes");
147 auto inputType = cfgc.options().get<std::string>("input-type");
148
149 // depending on whether to dispatch early (prompt) and on the input type, we
150 // set the matcher. Note that this has to be in accordance with the OutputSpecs
151 // configured for the PublisherSpec
152 auto dispmode = cfgc.options().get<std::string>("dispatching-mode");
153 if (dispmode == "complete") {
154 // nothing to do we leave the matcher empty which will suppress the dispatch
155 // trigger and all messages will be sent out together at end of computation
156 } else if (inputType == "digits") {
157 gDispatchTrigger = o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"};
158 } else if (inputType == "clustershw") {
159 gDispatchTrigger = o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERHW"};
160 } else if (inputType == "zsraw") {
161 gDispatchTrigger = o2::framework::ConcreteDataTypeMatcher{"TPC", "RAWDATA"};
162 }
163 // set up configuration
164 o2::conf::ConfigurableParam::updateFromFile(cfgc.options().get<std::string>("configFile"));
165 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
166 o2::conf::ConfigurableParam::writeINI("o2tpcrecoworkflow_configuration.ini");
167
168 gTpcSectorMask = 0;
169 for (auto s : tpcSectors) {
170 gTpcSectorMask |= (1ul << s);
171 }
172 bool doMC = not cfgc.options().get<bool>("disable-mc");
175 tpcSectors, // sector configuration
176 gTpcSectorMask, // same as bitmask
177 laneConfiguration, // lane configuration
178 sclOpt, // scaling options
179 doMC, //
180 nLanes, //
181 inputType, //
182 cfgc.options().get<std::string>("output-type"), //
183 cfgc.options().get<bool>("disable-root-input"), //
184 !cfgc.options().get<bool>("no-ca-clusterer"), //
185 !cfgc.options().get<bool>("no-tpc-zs-on-the-fly"), //
186 !cfgc.options().get<bool>("ignore-dist-stf"), //
187 cfgc.options().get<std::string>("ctf-dict"),
188 cfgc.options().get<bool>("select-ir-frames"),
189 cfgc.options().get<bool>("filtered-input"),
190 cfgc.options().get<int>("tpc-deadMap-sources"),
191 cfgc.options().get<bool>("tpc-mc-time-gain"));
192
193 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
194 o2::raw::HBFUtilsInitializer hbfIni(cfgc, wf);
195
196 return std::move(wf);
197}
Helper class to access load maps from CCDB.
Helper class to parse options for correction maps.
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
Workflow definition for the TPC reconstruction.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
static void addGlobalOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static CorrectionMapsGloOpts parseGlobalOptions(const o2::framework::ConfigParamRegistry &opts)
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::function< void(const char *)> OnWorkflowTerminationHook
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< framework::InputSpec > CompletionPolicyData
framework::WorkflowSpec getWorkflow(CompletionPolicyData *policyData, std::vector< int > const &tpcSectors, unsigned long tpcSectorMask, std::vector< int > const &laneConfiguration, const o2::tpc::CorrectionMapsGloOpts &sclOpts, bool propagateMC=true, unsigned nLanes=1, std::string const &cfgInput="digitizer", std::string const &cfgOutput="tracks", bool disableRootInput=false, int caClusterer=0, int zsOnTheFly=0, bool askDISTSTF=true, const std::string &ctfdictOpt="none", bool selIR=false, bool filteredInp=false, int deadMapSources=-1, bool useMCTimeGain=false)
create the workflow for TPC reconstruction
Helper class which holds commonly used policies.
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)