Project
Loading...
Searching...
No Matches
tpc-reco-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22#include "Framework/PartRef.h"
33
34#include <string>
35#include <stdexcept>
36#include <unordered_map>
37#include <regex>
38#include <cstdint>
39
40// we need a global variable to propagate the type the message dispatching of the
41// publisher will trigger on. This is dependent on the input type
42static o2::framework::Output gDispatchTrigger{"", ""};
43
44// Global variable used to transport data to the completion policy
46static uint64_t gTpcSectorMask = 0xFFFFFFFFF;
47
48void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
49{
51}
52
53// add workflow options, note that customization needs to be declared before
54// including Framework/runDataProcessing
55void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
56{
57 using namespace o2::framework;
58
59 std::vector<ConfigParamSpec> options{
60 {"input-type", VariantType::String, "digits", {"digitizer, digits, zsraw, clustershw, clusters, compressed-clusters, compressed-clusters-ctf, pass-through"}},
61 {"output-type", VariantType::String, "tracks", {"digits, zsraw, clustershw, clusters, tracks, compressed-clusters, encoded-clusters, disable-writer, send-clusters-per-sector, qa, no-shared-cluster-map, tpc-triggers"}},
62 {"disable-root-input", o2::framework::VariantType::Bool, false, {"disable root-files input reader"}},
63 {"no-ca-clusterer", VariantType::Bool, false, {"Use HardwareClusterer instead of clusterer of GPUCATracking"}},
64 {"disable-mc", VariantType::Bool, false, {"disable sending of MC information"}},
65 {"tpc-sectors", VariantType::String, "0-35", {"TPC sector range, e.g. 5-7,8,9"}},
66 {"tpc-lanes", VariantType::Int, 1, {"number of parallel lanes up to the tracker"}},
67 {"dispatching-mode", VariantType::String, "prompt", {"determines when to dispatch: prompt, complete"}},
68 {"no-tpc-zs-on-the-fly", VariantType::Bool, false, {"Do not use TPC zero suppression on the fly"}},
69 {"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}},
70 {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCHwClusterer.peakChargeThreshold=4;...')"}},
71 {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}},
72 {"filtered-input", VariantType::Bool, false, {"Filtered tracks, clusters input, prefix dataDescriptors with F"}},
73 {"select-ir-frames", VariantType::Bool, false, {"Subscribe and filter according to external IR Frames"}},
74 {"tpc-deadMap-sources", VariantType::Int, -1, {"Sources to consider for TPC dead channel map creation; -1=all, 0=deactivated"}},
75 {"tpc-mc-time-gain", VariantType::Bool, false, {"use time gain calibration for MC (true) or for data (false)"}},
76 };
79 std::swap(workflowOptions, options);
80}
81
82// customize dispatch policy, dispatch immediately what is ready
83void customize(std::vector<o2::framework::DispatchPolicy>& policies)
84{
86 // we customize all devices to dispatch data immediately
87 auto readerMatcher = [](auto const& spec) {
88 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex(".*-reader"));
89 };
90 auto triggerMatcher = [](auto const& query) {
91 // a bit of a hack but we want this to be configurable from the command line,
92 // however DispatchPolicy is inserted before all other setup. Triggering depending
93 // on the global variable set from the command line option. If scheduled messages
94 // are not triggered they are sent out at the end of the computation
95 return gDispatchTrigger.origin == query.origin && gDispatchTrigger.description == query.description;
96 };
97 policies.push_back({"prompt-for-reader", readerMatcher, DispatchOp::WhenReady, triggerMatcher});
98}
99
100// customize clusterers and cluster decoders to process immediately what comes in
101void customize(std::vector<o2::framework::CompletionPolicy>& policies)
102{
103 // we customize the pipeline processors to consume data as it comes
106 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-cluster-decoder.*", CompletionPolicy::CompletionOp::Consume));
107 policies.push_back(CompletionPolicyHelpers::defineByName("tpc-clusterer.*", CompletionPolicy::CompletionOp::Consume));
108 // ordered policies for the writers
109 policies.push_back(CompletionPolicyHelpers::consumeWhenAllOrdered(".*(?:TPC|tpc).*[w,W]riter.*"));
110 // the custom completion policy for the tracker
112}
113
115{
116 hook = [](const char* idstring) {
118 };
119}
120
121#include "Framework/runDataProcessing.h" // the main driver
122
123using namespace o2::framework;
124
140{
141 std::vector<int> tpcSectors = o2::RangeTokenizer::tokenize<int>(cfgc.options().get<std::string>("tpc-sectors"));
142 // the lane configuration defines the subspecification ids to be distributed among the lanes.
143 std::vector<int> laneConfiguration = tpcSectors; // Currently just a copy of the tpcSectors, why?
144 auto nLanes = cfgc.options().get<int>("tpc-lanes");
145 auto inputType = cfgc.options().get<std::string>("input-type");
146
147 // depending on whether to dispatch early (prompt) and on the input type, we
148 // set the matcher. Note that this has to be in accordance with the OutputSpecs
149 // configured for the PublisherSpec
150 auto dispmode = cfgc.options().get<std::string>("dispatching-mode");
151 if (dispmode == "complete") {
152 // nothing to do we leave the matcher empty which will suppress the dispatch
153 // trigger and all messages will be sent out together at end of computation
154 } else if (inputType == "digits") {
155 gDispatchTrigger = o2::framework::Output{"TPC", "DIGITS"};
156 } else if (inputType == "clustershw") {
157 gDispatchTrigger = o2::framework::Output{"TPC", "CLUSTERHW"};
158 } else if (inputType == "clustersnative") {
159 gDispatchTrigger = o2::framework::Output{"TPC", "CLUSTERNATIVE"};
160 } else if (inputType == "zsraw") {
161 gDispatchTrigger = o2::framework::Output{"TPC", "RAWDATA"};
162 }
163 // set up configuration
164 o2::conf::ConfigurableParam::updateFromFile(cfgc.options().get<std::string>("configFile"));
165 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
166 o2::conf::ConfigurableParam::writeINI("o2tpcrecoworkflow_configuration.ini");
167
168 gTpcSectorMask = 0;
169 for (auto s : tpcSectors) {
170 gTpcSectorMask |= (1ul << s);
171 }
172 bool doMC = not cfgc.options().get<bool>("disable-mc");
175 tpcSectors, // sector configuration
176 gTpcSectorMask, // same as bitmask
177 laneConfiguration, // lane configuration
178 sclOpt, // scaling options
179 doMC, //
180 nLanes, //
181 inputType, //
182 cfgc.options().get<std::string>("output-type"), //
183 cfgc.options().get<bool>("disable-root-input"), //
184 !cfgc.options().get<bool>("no-ca-clusterer"), //
185 !cfgc.options().get<bool>("no-tpc-zs-on-the-fly"), //
186 !cfgc.options().get<bool>("ignore-dist-stf"), //
187 cfgc.options().get<bool>("select-ir-frames"),
188 cfgc.options().get<bool>("filtered-input"),
189 cfgc.options().get<int>("tpc-deadMap-sources"),
190 cfgc.options().get<bool>("tpc-mc-time-gain"));
191
192 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
193 o2::raw::HBFUtilsInitializer hbfIni(cfgc, wf);
194
195 return std::move(wf);
196}
Helper class to access load maps from CCDB.
Helper function to tokenize sequences and ranges of integral numbers.
DPL completion policy helper for TPC scetor data.
Workflow definition for the TPC reconstruction.
static void writeINI(std::string const &filename, std::string const &keyOnly="")
static void updateFromFile(std::string const &, std::string const &paramsList="", bool unchangedOnly=false)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
static void addGlobalOptions(std::vector< o2::framework::ConfigParamSpec > &options)
static CorrectionMapsLoaderGloOpts parseGlobalOptions(const o2::framework::ConfigParamRegistry &opts)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::function< void(const char *)> OnWorkflowTerminationHook
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< framework::InputSpec > CompletionPolicyData
framework::WorkflowSpec getWorkflow(CompletionPolicyData *policyData, std::vector< int > const &tpcSectors, unsigned long tpcSectorMask, std::vector< int > const &laneConfiguration, const o2::tpc::CorrectionMapsLoaderGloOpts &sclOpts, bool propagateMC=true, unsigned nLanes=1, std::string const &cfgInput="digitizer", std::string const &cfgOutput="tracks", bool disableRootInput=false, int caClusterer=0, int zsOnTheFly=0, bool askDISTSTF=true, bool selIR=false, bool filteredInp=false, int deadMapSources=-1, bool useMCTimeGain=false)
create the workflow for TPC reconstruction
Helper class which holds commonly used policies.
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
header::DataDescription description
Definition Output.h:29
header::DataOrigin origin
Definition Output.h:28
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
std::vector< InputSpec > gPolicyData
unsigned long gTpcSectorMask
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)