29#include <fairlogger/Logger.h>
39#include <unordered_map>
41void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
46void customize(std::vector<o2::framework::CompletionPolicy>& policies)
54void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
56 std::vector<o2::framework::ConfigParamSpec> options{
71 {
"ignore-dist-stf", VariantType::Bool,
false, {
"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}},
72 {
"orbits-per-tf", VariantType::Int, -1, {
"default(-1) from GRP/CCDB, a valid value is required to run compressor for epn"}},
73 {
"calib-cluster", VariantType::Bool,
false, {
"to enable calib info production from clusters"}},
74 {
"for-calib", VariantType::Bool,
false, {
"to disable check on problematic, otherwise masked for new calibrations"}},
75 {
"calib-dia", VariantType::Bool,
false, {
"enabling writing calib for diagnostics"}},
76 {
"cosmics", VariantType::Bool,
false, {
"to enable cosmics utils"}}};
78 std::swap(workflowOptions, options);
104 auto nLanes = cfgc.
options().
get<
int>(
"tof-lanes");
105 auto inputType = cfgc.
options().
get<std::string>(
"input-type");
106 auto outputType = cfgc.
options().
get<std::string>(
"output-type");
108 bool writecluster = 0;
113 if (outputType.rfind(
"clusters") < outputType.size()) {
116 if (outputType.rfind(
"digits") < outputType.size()) {
119 if (outputType.rfind(
"raw") < outputType.size()) {
124 bool clusterinput = 0;
126 if (inputType ==
"digits") {
128 }
else if (inputType ==
"clusters") {
130 }
else if (inputType ==
"raw") {
132 writeerr = cfgc.
options().
get<
bool>(
"write-decoding-errors");
139 auto useMC = !cfgc.
options().
get<
bool>(
"disable-mc");
140 auto useCCDB = cfgc.
options().
get<
bool>(
"use-ccdb");
141 bool disableRootInput = cfgc.
options().
get<
bool>(
"disable-root-input") || rawinput;
142 bool disableRootOutput = cfgc.
options().
get<
bool>(
"disable-root-output");
143 bool conetmode = cfgc.
options().
get<
bool>(
"conet-mode");
144 bool disableROWwriting = cfgc.
options().
get<
bool>(
"disable-row-writing");
145 auto isCalibFromCluster = cfgc.
options().
get<
bool>(
"calib-cluster");
146 auto isCosmics = cfgc.
options().
get<
bool>(
"cosmics");
147 auto ignoreDistStf = cfgc.
options().
get<
bool>(
"ignore-dist-stf");
148 auto localCmp = cfgc.
options().
get<
bool>(
"local-cmp");
149 auto orbitPerTF = cfgc.
options().
get<
int>(
"orbits-per-tf");
151 auto isForCalib = cfgc.
options().
get<
bool>(
"for-calib");
152 auto writingDia = cfgc.
options().
get<
bool>(
"calib-dia");
154 LOG(
debug) <<
"TOF RECO WORKFLOW configuration";
162 LOG(
debug) <<
"CCDB url = " << ccdb_url;
164 LOG(
debug) <<
"TOF disable-root-input = " << disableRootInput;
165 LOG(
debug) <<
"TOF disable-root-output = " << disableRootOutput;
166 LOG(
debug) <<
"TOF conet-mode = " << conetmode;
167 LOG(
debug) <<
"TOF ignore Dist Stf = " << ignoreDistStf;
168 LOG(
debug) <<
"Is tof compressor in the chain = " << localCmp;
169 if (orbitPerTF == -1) {
170 LOG(
debug) <<
"Orbit per TF read from CCDB";
172 LOG(
debug) <<
"Orbit per TF = " << orbitPerTF;
174 LOG(
debug) <<
"TOF disable-row-writing = " << disableROWwriting;
175 LOG(
debug) <<
"TOF write-decoding-errors = " << writeerr;
177 if (clusterinput && !disableRootInput) {
178 LOG(
debug) <<
"Insert TOF Cluster Reader";
180 }
else if (dgtinput) {
182 if (!disableRootInput) {
183 LOG(
debug) <<
"Insert TOF Digit reader from file";
187 LOG(
debug) <<
"Insert TOF Raw writer";
190 }
else if (rawinput) {
191 LOG(
debug) <<
"Insert TOF Compressed Raw Decoder";
192 auto inputDesc = cfgc.
options().
get<std::string>(
"input-desc");
196 if ((!dgtinput || disableRootInput) && writedigit && !disableRootOutput) {
198 LOG(
debug) <<
"Insert TOF Digit Writer";
202 if (!clusterinput && writecluster) {
203 LOG(
debug) <<
"Insert TOF Clusterizer";
205 if (writecluster && !disableRootOutput) {
206 LOG(
debug) <<
"Insert TOF Cluster Writer";
215 LOG(
debug) <<
"Number of active devices = " << specs.size();
220 return std::move(specs);
TOF compressed data decoding task.
Definition of the Names Generator class.
Convert CTF (EncodedBlocks) to FT0 digit/channels strean.
Convert clusters streams to CTF (EncodedBlocks)
static std::string getCCDBServer()
static void updateFromString(std::string const &)
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
framework::DataProcessorSpec getDigitReaderSpec(bool useMC)
o2::framework::DataProcessorSpec getTOFClusterizerSpec(bool useMC, bool useCCDB=0, bool doCalib=0, bool isCosmic=0, std::string ccdb_url="", bool isForCalib=false)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
o2::framework::DataProcessorSpec getTOFRawWriterSpec()
o2::framework::DataProcessorSpec getTOFCalibWriterSpec(const char *outdef="o2calib_tof.root", bool toftpc=false, bool addDia=false, bool onlyDia=false)
framework::DataProcessorSpec getClusterReaderSpec(bool useMC)
o2::framework::DataProcessorSpec getTOFClusterWriterSpec(bool useMC)
framework::DataProcessorSpec getCompressedDecodingSpec(const std::string &inputDesc, bool conet=false, bool askDISTSTF=true, int norbitPerTF=-1, bool localCmp=false)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static void addNewTimeSliceCallback(std::vector< o2::framework::CallbacksPolicy > &policies)
static void addConfigOption(std::vector< o2::framework::ConfigParamSpec > &opts, const std::string &defOpt=std::string(o2::base::NameConf::DIGITIZATIONCONFIGFILE))
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
WorkflowSpec defineDataProcessing(ConfigContext const &cfgc)
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)