12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
119#include <type_traits>
132void customize(std::vector<o2::framework::CompletionPolicy>& policies)
145void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
148 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
149 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
150 workflowOptions.push_back(
151 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
153 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
155 workflowOptions.push_back(
156 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
158 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
159 workflowOptions.push_back(
162 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
163 workflowOptions.push_back(
169 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
170 workflowOptions.push_back(
171 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
173 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
177 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
178 workflowOptions.push_back(
179 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
186 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
187 workflowOptions.push_back(
191 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
192 workflowOptions.push_back(
193 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
194 workflowOptions.push_back(
195 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
218 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
219 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
221 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
224 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
227 workflowOptions.push_back(
ConfigParamSpec{
"store-ctp-lumi", VariantType::Float, -1.f, {
"store CTP lumi scaler in CTP digits (if >= 0)"}});
230void customize(std::vector<o2::framework::DispatchPolicy>& policies)
234 auto matcher = [](
auto const& spec) {
235 return spec.name ==
"SimReader";
237 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
243 const auto offset =
int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
244 const auto increment =
int64_t(hbfu.nHBFPerTF);
245 const auto startTime = hbfu.startTime;
246 const auto orbitFirst = hbfu.orbitFirst;
252void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
264 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
265 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
266 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
281 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
284 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
289 return std::min(lanes, (
int)sectors.size());
301 std::stringstream streamthis;
302 std::stringstream streamparent;
304 streamthis <<
"TPCGEMINIT_PID" << getpid();
305 streamparent <<
"TPCGEMINIT_PID" << getppid();
306 if (getenv(streamparent.str().c_str())) {
307 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
311 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
312 setenv(streamthis.str().c_str(),
"ON", 1);
315 cdb.setUseDefaults();
320 LOGP(info,
"initTPC: TPC GEM param, Gas param + CalPadGainFull updated for time {}", timestamp);
336 std::stringstream
str;
337 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
338 LOG(info) <<
"Publishing master key " <<
str.str();
339 setenv(
str.str().c_str(),
value, 1);
345 std::stringstream
str;
346 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
348 return getenv(
str.str().c_str());
351std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
355 LOG(error) <<
"This workflow needs a valid GRP file to start";
361 return std::shared_ptr<o2::parameters::GRPObject>(grp);
369 std::vector<std::string>
fields;
371 std::istringstream ss(
src);
373 while (std::getline(ss, token, sep)) {
374 if (!token.empty()) {
391 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
394 if (detlist.compare(unsetVal) == 0) {
398 std::vector<std::string> tokens =
splitString(detlist, separator);
401 for (
auto& token : tokens) {
402 ids.emplace_back(token.c_str());
405 isWhiteLister = doWhiteListing;
412 return ids.size() > 0;
418 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
419 return found == isWhiteLister;
423 std::vector<o2::detectors::DetID>
ids;
430 return DetFilterer(optionVal, unsetValue, separator,
true);
436 return DetFilterer(optionVal, unsetValue, separator,
false);
451 std::string dplProcessName =
whoAmI(configcontext);
454 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
463 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
468 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
474 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
478 const auto GRPTIMEKEY =
"GRPTIMESTART";
483 auto t = grp->getTimeStart();
489 LOG(fatal) <<
"Expected env value not found";
492 return boost::lexical_cast<uint64_t>(tstr);
502 getGRPStartTime(grp.get());
504 if (!hbfu.startTime) {
505 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
519 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
529 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
530 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
532 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
533 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
534 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
540 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
551 for (
auto&
f : filterers) {
564 std::stringstream
str;
565 str <<
"GRPDETKEY_" <<
id.getName();
566 if (
gIsMaster and grp.get() !=
nullptr) {
567 auto ok = grp->isDetReadOut(
id);
583 if (configcontext.
options().
get<
bool>(
"only-context")) {
587 auto accepted = accept(
id);
590 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
591 bool is_ingrp = isInGRPReadout(
id);
593 auto isRun = accepted && (forceAccepted || is_ingrp);
595 LOG(info) <<
id.getName()
596 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
597 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
598 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
599 <<
" is run? " << (isRun ?
"yes" :
"no");
604 std::vector<o2::detectors::DetID> detList;
607 std::vector<int> tpcsectors;
610 if (!helpasked && ismaster) {
614 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
616 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
619 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
620 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
622 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
624 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
625 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
627 if (!internalwrite) {
634 const int firstOtherChannel = 36;
635 int fanoutsize = firstOtherChannel;
646#ifdef ENABLE_UPGRADES
648 if (isEnabled(o2::detectors::DetID::IT3)) {
649 detList.emplace_back(o2::detectors::DetID::IT3);
657 if (isEnabled(o2::detectors::DetID::TRK)) {
658 detList.emplace_back(o2::detectors::DetID::TRK);
677 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
678 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
682 auto maskDRM = (uint32_t)configcontext.
options().
get<
int>(
"tof-drm-bitmask");
693 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
713 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
714 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
745 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
746 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
747 if (!disableTrapSim) {
807 float lumiScaler = configcontext.
options().
get<
float>(
"store-ctp-lumi");
814 if (!configcontext.
options().
get<
bool>(
"only-context")) {
818 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
820 for (
auto& s : digitizerSpecs) {
823 for (
auto& s : writerSpecs) {
827 std::vector<DataProcessorSpec> remaining;
828 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
829 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
830 for (
auto& s : remaining) {
840 bool haveCCDBInputSpec =
false;
841 for (
auto spec : specs) {
842 for (
auto in : spec.inputs) {
843 if (in.lifetime == Lifetime::Condition) {
844 haveCCDBInputSpec =
true;
849 if (!haveCCDBInputSpec && specs.size() > 0) {
850 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
851 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
856 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
857 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
Class to use TOF calibration (decalibration, calibration)
std::vector< std::shared_ptr< arrow::Field > > fields
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void initTPC(long timestamp)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
void printKeyValues(bool showProv=true, bool useLogger=false, bool withPadding=true, bool showHash=true) const final
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, float ctpLumiScaler, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp, uint32_t maskDRM)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
o2::framework::DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
@ CalPadGainFull
Full pad gain calibration.
@ ParGas
Parameter for Gas.
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getTRKDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getTRKDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"