Project
Loading...
Searching...
No Matches
tpc-reco-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/PartRef.h"
33
34#include <string>
35#include <stdexcept>
36#include <unordered_map>
37#include <regex>
38#include <cstdint>
39
40// we need a global variable to propagate the type the message dispatching of the
41// publisher will trigger on. This is dependent on the input type
42static o2::framework::ConcreteDataTypeMatcher gDispatchTrigger{"", ""};
43
44// Global variable used to transport data to the completion policy
46static uint64_t gTpcSectorMask = 0xFFFFFFFFF;
47
48void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
49{
51}
52
53// add workflow options, note that customization needs to be declared before
54// including Framework/runDataProcessing
55void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
56{
57 using namespace o2::framework;
58
59 std::vector<ConfigParamSpec> options{
60 {"input-type", VariantType::String, "digits", {"digitizer, digits, zsraw, clustershw, clusters, compressed-clusters-root, compressed-clusters-flat, compressed-clusters-flat-for-encode, pass-through"}},
61 {"output-type", VariantType::String, "tracks", {"digits, zsraw, clustershw, clusters, tracks, compressed-clusters-root, compressed-clusters-flat, encoded-clusters, disable-writer, send-clusters-per-sector, qa, no-shared-cluster-map, tpc-triggers"}},
62 {"disable-root-input", o2::framework::VariantType::Bool, false, {"disable root-files input reader"}},
63 {"no-ca-clusterer", VariantType::Bool, false, {"Use HardwareClusterer instead of clusterer of GPUCATracking"}},
64 {"disable-mc", VariantType::Bool, false, {"disable sending of MC information"}},
65 {"tpc-sectors", VariantType::String, "0-35", {"TPC sector range, e.g. 5-7,8,9"}},
66 {"tpc-lanes", VariantType::Int, 1, {"number of parallel lanes up to the tracker"}},
67 {"dispatching-mode", VariantType::String, "prompt", {"determines when to dispatch: prompt, complete"}},
68 {"no-tpc-zs-on-the-fly", VariantType::Bool, false, {"Do not use TPC zero suppression on the fly"}},
69 {"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}},
70 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCHwClusterer.peakChargeThreshold=4;...')"}},
71 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
72 {"filtered-input", VariantType::Bool, false, {"Filtered tracks, clusters input, prefix dataDescriptors with F"}},
73 {"select-ir-frames", VariantType::Bool, false, {"Subscribe and filter according to external IR Frames"}},
74 {"ctf-dict", VariantType::String, "ccdb", {"CTF dictionary: empty or ccdb=CCDB, none=no external dictionary otherwise: local filename"}},
75 {"tpc-deadMap-sources", VariantType::Int, -1, {"Sources to consider for TPC dead channel map creation; -1=all, 0=deactivated"}},
76 {"tpc-mc-time-gain", VariantType::Bool, false, {"use time gain calibration for MC (true) or for data (false)"}},
77 };
80 std::swap(workflowOptions, options);
81}
82
83// customize dispatch policy, dispatch immediately what is ready
84void customize(std::vector<o2::framework::DispatchPolicy>& policies)
85{
87 // we customize all devices to dispatch data immediately
88 auto readerMatcher = [](auto const& spec) {
89 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex(".*-reader"));
90 };
91 auto triggerMatcher = [](auto const& query) {
92 // a bit of a hack but we want this to be configurable from the command line,
93 // however DispatchPolicy is inserted before all other setup. Triggering depending
94 // on the global variable set from the command line option. If scheduled messages
95 // are not triggered they are sent out at the end of the computation
96 return gDispatchTrigger.origin == query.origin && gDispatchTrigger.description == query.description;
97 };
98 policies.push_back({"prompt-for-reader", readerMatcher, DispatchOp::WhenReady, triggerMatcher});
99}
100
101// customize clusterers and cluster decoders to process immediately what comes in
102void customize(std::vector<o2::framework::CompletionPolicy>& policies)
103{
104 // we customize the pipeline processors to consume data as it comes
107 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-cluster-decoder.*", CompletionPolicy::CompletionOp::Consume));
108 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-clusterer.*", CompletionPolicy::CompletionOp::Consume));
109 // ordered policies for the writers
110 policies.push_back(CompletionPolicyHelpers::consumeWhenAllOrdered(".*(?:TPC|tpc).*[w,W]riter.*"));
111 // the custom completion policy for the tracker
113}
114
116{
117 hook = [](const char* idstring) {
119 };
120}
121
122#include "Framework/runDataProcessing.h" // the main driver
123
124using namespace o2::framework;
125
141{
142 std::vector<int> tpcSectors = o2::RangeTokenizer::tokenize<int>(cfgc.options().get<std::string>("tpc-sectors"));
143 // the lane configuration defines the subspecification ids to be distributed among the lanes.
144 std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
145 auto nLanes = cfgc.options().get<int>("tpc-lanes");
146 auto inputType = cfgc.options().get<std::string>("input-type");
147
148 // depending on whether to dispatch early (prompt) and on the input type, we
149 // set the matcher. Note that this has to be in accordance with the OutputSpecs
150 // configured for the PublisherSpec
151 auto dispmode = cfgc.options().get<std::string>("dispatching-mode");
152 if (dispmode == "complete") {
153 // nothing to do we leave the matcher empty which will suppress the dispatch
154 // trigger and all messages will be sent out together at end of computation
155 } else if (inputType == "digits") {
156 gDispatchTrigger = o2::framework::ConcreteDataTypeMatcher{"TPC", "DIGITS"};
157 } else if (inputType == "clustershw") {
158 gDispatchTrigger = o2::framework::ConcreteDataTypeMatcher{"TPC", "CLUSTERHW"};
159 } else if (inputType == "zsraw") {
160 gDispatchTrigger = o2::framework::ConcreteDataTypeMatcher{"TPC", "RAWDATA"};
161 }
162 // set up configuration
163 o2::conf::ConfigurableParam::updateFromFile(cfgc.options().get<std::string>("configFile"));
164 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
165 o2::conf::ConfigurableParam::writeINI("o2tpcrecoworkflow_configuration.ini");
166
167 gTpcSectorMask = 0;
168 for (auto s : tpcSectors) {
169 gTpcSectorMask |= (1ul << s);
170 }
171 bool doMC = not cfgc.options().get<bool>("disable-mc");
174 tpcSectors, // sector configuration
175 gTpcSectorMask, // same as bitmask
176 laneConfiguration, // lane configuration
177 sclOpt, // scaling options
178 doMC, //
179 nLanes, //
180 inputType, //
181 cfgc.options().get<std::string>("output-type"), //
182 cfgc.options().get<bool>("disable-root-input"), //
183 !cfgc.options().get<bool>("no-ca-clusterer"), //
184 !cfgc.options().get<bool>("no-tpc-zs-on-the-fly"), //
185 !cfgc.options().get<bool>("ignore-dist-stf"), //
186 cfgc.options().get<std::string>("ctf-dict"),
187 cfgc.options().get<bool>("select-ir-frames"),
188 cfgc.options().get<bool>("filtered-input"),
189 cfgc.options().get<int>("tpc-deadMap-sources"),
190 cfgc.options().get<bool>("tpc-mc-time-gain"));
191
192 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
193 o2::raw::HBFUtilsInitializer hbfIni(cfgc, wf);
194
195 return std::move(wf);
196}
Helper class to access load maps from CCDB.
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
Workflow definition for the TPC reconstruction.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
static void addGlobalOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static CorrectionMapsLoaderGloOpts parseGlobalOptions(const o2::framework::ConfigParamRegistry &opts)
Defining PrimaryVertex explicitly as messageable.
std::function< void(const char *)> OnWorkflowTerminationHook
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< framework::InputSpec > CompletionPolicyData
framework::WorkflowSpec getWorkflow(CompletionPolicyData *policyData, std::vector< int > const &tpcSectors, unsigned long tpcSectorMask, std::vector< int > const &laneConfiguration, const o2::tpc::CorrectionMapsLoaderGloOpts &sclOpts, bool propagateMC=true, unsigned nLanes=1, std::string const &cfgInput="digitizer", std::string const &cfgOutput="tracks", bool disableRootInput=false, int caClusterer=0, int zsOnTheFly=0, bool askDISTSTF=true, const std::string &ctfdictOpt="none", bool selIR=false, bool filteredInp=false, int deadMapSources=-1, bool useMCTimeGain=false)
create the workflow for TPC reconstruction
Helper class which holds commonly used policies.
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)