12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
114#include <type_traits>
127void customize(std::vector<o2::framework::CompletionPolicy>& policies)
140void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
143 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
144 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
145 workflowOptions.push_back(
146 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
148 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
150 workflowOptions.push_back(
151 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
153 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
154 workflowOptions.push_back(
157 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
158 workflowOptions.push_back(
164 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
165 workflowOptions.push_back(
166 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
168 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
172 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
173 workflowOptions.push_back(
174 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
181 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
182 workflowOptions.push_back(
186 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
187 workflowOptions.push_back(
188 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
189 workflowOptions.push_back(
190 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
212 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
213 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
215 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
218 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
221void customize(std::vector<o2::framework::DispatchPolicy>& policies)
225 auto matcher = [](
auto const& spec) {
226 return spec.name ==
"SimReader";
228 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
234 const auto offset = int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
235 const auto increment = int64_t(hbfu.nHBFPerTF);
236 const auto startTime = hbfu.startTime;
237 const auto orbitFirst = hbfu.orbitFirst;
243void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
255 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
256 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
257 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
272 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
275 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
280 return std::min(lanes, (
int)sectors.size());
292 std::stringstream streamthis;
293 std::stringstream streamparent;
295 streamthis <<
"TPCGEMINIT_PID" << getpid();
296 streamparent <<
"TPCGEMINIT_PID" << getppid();
297 if (getenv(streamparent.str().c_str())) {
298 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
302 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
303 setenv(streamthis.str().c_str(),
"ON", 1);
306 cdb.setUseDefaults();
316 std::stringstream
str;
317 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
318 LOG(info) <<
"Publishing master key " <<
str.str();
319 setenv(
str.str().c_str(),
value, 1);
325 std::stringstream
str;
326 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
328 return getenv(
str.str().c_str());
331std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
335 LOG(error) <<
"This workflow needs a valid GRP file to start";
341 return std::shared_ptr<o2::parameters::GRPObject>(grp);
349 std::vector<std::string> fields;
351 std::istringstream ss(
src);
353 while (std::getline(ss, token, sep)) {
354 if (!token.empty()) {
355 fields.push_back(token);
371 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
374 if (detlist.compare(unsetVal) == 0) {
378 std::vector<std::string> tokens =
splitString(detlist, separator);
381 for (
auto& token : tokens) {
382 ids.emplace_back(token.c_str());
385 isWhiteLister = doWhiteListing;
392 return ids.size() > 0;
398 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
399 return found == isWhiteLister;
403 std::vector<o2::detectors::DetID>
ids;
410 return DetFilterer(optionVal, unsetValue, separator,
true);
416 return DetFilterer(optionVal, unsetValue, separator,
false);
431 std::string dplProcessName =
whoAmI(configcontext);
434 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
443 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
448 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
454 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
458 const auto GRPTIMEKEY =
"GRPTIMESTART";
463 auto t = grp->getTimeStart();
469 LOG(fatal) <<
"Expected env value not found";
472 return boost::lexical_cast<uint64_t>(tstr);
482 getGRPStartTime(grp.get());
484 if (!hbfu.startTime) {
485 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
499 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
509 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
510 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
512 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
513 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
514 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
520 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
531 for (
auto&
f : filterers) {
544 std::stringstream
str;
545 str <<
"GRPDETKEY_" <<
id.getName();
546 if (
gIsMaster and grp.get() !=
nullptr) {
547 auto ok = grp->isDetReadOut(
id);
563 if (configcontext.
options().
get<
bool>(
"only-context")) {
567 auto accepted = accept(
id);
570 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
571 bool is_ingrp = isInGRPReadout(
id);
573 auto isRun = accepted && (forceAccepted || is_ingrp);
575 LOG(info) <<
id.getName()
576 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
577 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
578 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
579 <<
" is run? " << (isRun ?
"yes" :
"no");
584 std::vector<o2::detectors::DetID> detList;
587 std::vector<int> tpcsectors;
590 if (!helpasked && ismaster) {
594 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
596 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
599 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
600 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
602 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
604 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
605 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
607 if (!internalwrite) {
614 const int firstOtherChannel = 36;
615 int fanoutsize = firstOtherChannel;
626#ifdef ENABLE_UPGRADES
628 if (isEnabled(o2::detectors::DetID::IT3)) {
629 detList.emplace_back(o2::detectors::DetID::IT3);
648 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
649 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
663 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
683 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
684 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
715 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
716 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
717 if (!disableTrapSim) {
783 if (!configcontext.
options().
get<
bool>(
"only-context")) {
787 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
789 for (
auto& s : digitizerSpecs) {
792 for (
auto& s : writerSpecs) {
796 std::vector<DataProcessorSpec> remaining;
797 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
798 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
799 for (
auto& s : remaining) {
809 bool haveCCDBInputSpec =
false;
810 for (
auto spec : specs) {
811 for (
auto in : spec.inputs) {
812 if (in.lifetime == Lifetime::Condition) {
813 haveCCDBInputSpec =
true;
818 if (!haveCCDBInputSpec && specs.size() > 0) {
819 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
820 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
825 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
826 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp)
DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"