Project
Loading...
Searching...
No Matches
tof-reco-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
29#include <fairlogger/Logger.h>
36
37#include <string>
38#include <stdexcept>
39#include <unordered_map>
40
41void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
42{
44}
45
46void customize(std::vector<o2::framework::CompletionPolicy>& policies)
47{
48 // ordered policies for the writers
49 policies.push_back(CompletionPolicyHelpers::consumeWhenAllOrdered(".*(?:TOF|tof).*[W,w]riter.*"));
50}
51
52// add workflow options, note that customization needs to be declared before
53// including Framework/runDataProcessing
54void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
55{
56 std::vector<o2::framework::ConfigParamSpec> options{
57 {"input-type", o2::framework::VariantType::String, "digits", {"digits, raw, clusters"}},
58 {"output-type", o2::framework::VariantType::String, "clusters", {"digits, clusters, raw"}},
59 {"disable-mc", o2::framework::VariantType::Bool, false, {"disable sending of MC information, TBI"}},
60 {"tof-sectors", o2::framework::VariantType::String, "0-17", {"TOF sector range, e.g. 5-7,8,9 ,TBI"}},
61 {"tof-lanes", o2::framework::VariantType::Int, 1, {"number of parallel lanes up to the matcher, TBI"}},
62 {"use-ccdb", o2::framework::VariantType::Bool, false, {"enable access to ccdb tof calibration objects"}},
63 {"local-cmp", o2::framework::VariantType::Bool, false, {"if compressor is running in the same chain (e.g. epn)"}},
64 {"input-desc", o2::framework::VariantType::String, "CRAWDATA", {"Input specs description string"}},
65 {"disable-root-input", o2::framework::VariantType::Bool, false, {"disable root-files input readers"}},
66 {"disable-root-output", o2::framework::VariantType::Bool, false, {"disable root-files output writers"}},
67 {"conet-mode", o2::framework::VariantType::Bool, false, {"enable conet mode"}},
68 {"configKeyValues", o2::framework::VariantType::String, "", {"Semicolon separated key=value strings ..."}},
69 {"disable-row-writing", o2::framework::VariantType::Bool, false, {"disable ROW in Digit writing"}},
70 {"write-decoding-errors", o2::framework::VariantType::Bool, false, {"trace errors in digits output when decoding"}},
71 {"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}},
72 {"orbits-per-tf", VariantType::Int, -1, {"default(-1) from GRP/CCDB, a valid value is required to run compressor for epn"}},
73 {"calib-cluster", VariantType::Bool, false, {"to enable calib info production from clusters"}},
74 {"for-calib", VariantType::Bool, false, {"to disable check on problematic, otherwise masked for new calibrations"}},
75 {"calib-dia", VariantType::Bool, false, {"enabling writing calib for diagnostics"}},
76 {"cosmics", VariantType::Bool, false, {"to enable cosmics utils"}}};
78 std::swap(workflowOptions, options);
79}
80
81#include "Framework/runDataProcessing.h" // the main driver
82
83using namespace o2::framework;
84
100{
101 WorkflowSpec specs;
102
103 // the lane configuration defines the subspecification ids to be distributed among the lanes.
104 auto nLanes = cfgc.options().get<int>("tof-lanes");
105 auto inputType = cfgc.options().get<std::string>("input-type");
106 auto outputType = cfgc.options().get<std::string>("output-type");
107
108 bool writecluster = 0;
109 bool writedigit = 0;
110 bool writeraw = 0;
111 bool writeerr = 0;
112
113 if (outputType.rfind("clusters") < outputType.size()) {
114 writecluster = 1;
115 }
116 if (outputType.rfind("digits") < outputType.size()) {
117 writedigit = 1;
118 }
119 if (outputType.rfind("raw") < outputType.size()) {
120 writeraw = 1;
121 }
122
123 bool dgtinput = 0;
124 bool clusterinput = 0;
125 bool rawinput = 0;
126 if (inputType == "digits") {
127 dgtinput = 1;
128 } else if (inputType == "clusters") {
129 clusterinput = 1;
130 } else if (inputType == "raw") {
131 rawinput = 1;
132 writeerr = cfgc.options().get<bool>("write-decoding-errors");
133 }
134
135 if (!cfgc.helpOnCommandLine()) {
136 o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
137 }
138
139 auto useMC = !cfgc.options().get<bool>("disable-mc");
140 auto useCCDB = cfgc.options().get<bool>("use-ccdb");
141 bool disableRootInput = cfgc.options().get<bool>("disable-root-input") || rawinput;
142 bool disableRootOutput = cfgc.options().get<bool>("disable-root-output");
143 bool conetmode = cfgc.options().get<bool>("conet-mode");
144 bool disableROWwriting = cfgc.options().get<bool>("disable-row-writing");
145 auto isCalibFromCluster = cfgc.options().get<bool>("calib-cluster");
146 auto isCosmics = cfgc.options().get<bool>("cosmics");
147 auto ignoreDistStf = cfgc.options().get<bool>("ignore-dist-stf");
148 auto localCmp = cfgc.options().get<bool>("local-cmp");
149 auto orbitPerTF = cfgc.options().get<int>("orbits-per-tf");
150 auto ccdb_url = o2::base::NameConf::getCCDBServer();
151 auto isForCalib = cfgc.options().get<bool>("for-calib");
152 auto writingDia = cfgc.options().get<bool>("calib-dia");
153
154 LOG(debug) << "TOF RECO WORKFLOW configuration";
155 LOG(debug) << "TOF input = " << cfgc.options().get<std::string>("input-type");
156 LOG(debug) << "TOF output = " << cfgc.options().get<std::string>("output-type");
157 LOG(debug) << "TOF sectors = " << cfgc.options().get<std::string>("tof-sectors");
158 LOG(debug) << "TOF disable-mc = " << cfgc.options().get<std::string>("disable-mc");
159 LOG(debug) << "TOF lanes = " << cfgc.options().get<std::string>("tof-lanes");
160 LOG(debug) << "TOF use-ccdb = " << cfgc.options().get<std::string>("use-ccdb");
161 if (useCCDB) {
162 LOG(debug) << "CCDB url = " << ccdb_url;
163 }
164 LOG(debug) << "TOF disable-root-input = " << disableRootInput;
165 LOG(debug) << "TOF disable-root-output = " << disableRootOutput;
166 LOG(debug) << "TOF conet-mode = " << conetmode;
167 LOG(debug) << "TOF ignore Dist Stf = " << ignoreDistStf;
168 LOG(debug) << "Is tof compressor in the chain = " << localCmp;
169 if (orbitPerTF == -1) {
170 LOG(debug) << "Orbit per TF read from CCDB";
171 } else {
172 LOG(debug) << "Orbit per TF = " << orbitPerTF;
173 }
174 LOG(debug) << "TOF disable-row-writing = " << disableROWwriting;
175 LOG(debug) << "TOF write-decoding-errors = " << writeerr;
176
177 if (clusterinput && !disableRootInput) {
178 LOG(debug) << "Insert TOF Cluster Reader";
179 specs.emplace_back(o2::tof::getClusterReaderSpec(useMC));
180 } else if (dgtinput) {
181 // TOF clusterizer
182 if (!disableRootInput) {
183 LOG(debug) << "Insert TOF Digit reader from file";
184 specs.emplace_back(o2::tof::getDigitReaderSpec(useMC));
185 }
186 if (writeraw) {
187 LOG(debug) << "Insert TOF Raw writer";
188 specs.emplace_back(o2::tof::getTOFRawWriterSpec());
189 }
190 } else if (rawinput) {
191 LOG(debug) << "Insert TOF Compressed Raw Decoder";
192 auto inputDesc = cfgc.options().get<std::string>("input-desc");
193 specs.emplace_back(o2::tof::getCompressedDecodingSpec(inputDesc, conetmode, !ignoreDistStf, orbitPerTF, localCmp));
194 useMC = 0;
195 }
196 if ((!dgtinput || disableRootInput) && writedigit && !disableRootOutput) {
197 // add TOF digit writer without mc labels
198 LOG(debug) << "Insert TOF Digit Writer";
199 specs.emplace_back(o2::tof::getTOFDigitWriterSpec(0, writeerr));
200 }
201
202 if (!clusterinput && writecluster) {
203 LOG(debug) << "Insert TOF Clusterizer";
204 specs.emplace_back(o2::tof::getTOFClusterizerSpec(useMC, useCCDB, isCalibFromCluster, isCosmics, ccdb_url.c_str(), isForCalib));
205 if (writecluster && !disableRootOutput) {
206 LOG(debug) << "Insert TOF Cluster Writer";
207 specs.emplace_back(o2::tof::getTOFClusterWriterSpec(useMC));
208 }
209 }
210
211 if (writingDia) {
212 specs.emplace_back(o2::tof::getTOFCalibWriterSpec("o2calib_tof.root", false, true, true));
213 }
214
215 LOG(debug) << "Number of active devices = " << specs.size();
216
217 // configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
218 o2::raw::HBFUtilsInitializer hbfIni(cfgc, specs);
219
220 return std::move(specs);
221}
TOF compressed data decoding task.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert clusters streams to CTF (EncodedBlocks)
std::ostringstream debug
static std::string getCCDBServer()
Definition NameConf.cxx:110
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getDigitReaderSpec(bool useMC)
o2::framework::DataProcessorSpec getTOFClusterizerSpec(bool useMC, bool useCCDB=0, bool doCalib=0, bool isCosmic=0, std::string ccdb_url="", bool isForCalib=false)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
o2::framework::DataProcessorSpec getTOFRawWriterSpec()
o2::framework::DataProcessorSpec getTOFCalibWriterSpec(const char *outdef="o2calib_tof.root", bool toftpc=false, bool addDia=false, bool onlyDia=false)
framework::DataProcessorSpec getClusterReaderSpec(bool useMC)
o2::framework::DataProcessorSpec getTOFClusterWriterSpec(bool useMC)
framework::DataProcessorSpec getCompressedDecodingSpec(const std::string &inputDesc, bool conet=false, bool askDISTSTF=true, int norbitPerTF=-1, bool localCmp=false)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)