12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
118#include <type_traits>
131void customize(std::vector<o2::framework::CompletionPolicy>& policies)
144void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
147 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
148 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
149 workflowOptions.push_back(
150 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
152 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
154 workflowOptions.push_back(
155 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
157 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
158 workflowOptions.push_back(
161 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
162 workflowOptions.push_back(
168 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
169 workflowOptions.push_back(
170 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
172 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
176 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
177 workflowOptions.push_back(
178 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
185 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
186 workflowOptions.push_back(
190 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
191 workflowOptions.push_back(
192 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
193 workflowOptions.push_back(
194 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
216 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
217 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
219 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
222 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
225void customize(std::vector<o2::framework::DispatchPolicy>& policies)
229 auto matcher = [](
auto const& spec) {
230 return spec.name ==
"SimReader";
232 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
238 const auto offset = int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
239 const auto increment = int64_t(hbfu.nHBFPerTF);
240 const auto startTime = hbfu.startTime;
241 const auto orbitFirst = hbfu.orbitFirst;
247void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
259 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
260 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
261 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
276 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
279 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
284 return std::min(lanes, (
int)sectors.size());
296 std::stringstream streamthis;
297 std::stringstream streamparent;
299 streamthis <<
"TPCGEMINIT_PID" << getpid();
300 streamparent <<
"TPCGEMINIT_PID" << getppid();
301 if (getenv(streamparent.str().c_str())) {
302 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
306 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
307 setenv(streamthis.str().c_str(),
"ON", 1);
310 cdb.setUseDefaults();
315 LOGP(info,
"initTPC: TPC GEM param updated for time {}", timestamp);
326 std::stringstream
str;
327 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
328 LOG(info) <<
"Publishing master key " <<
str.str();
329 setenv(
str.str().c_str(),
value, 1);
335 std::stringstream
str;
336 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
338 return getenv(
str.str().c_str());
341std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
345 LOG(error) <<
"This workflow needs a valid GRP file to start";
351 return std::shared_ptr<o2::parameters::GRPObject>(grp);
359 std::vector<std::string> fields;
361 std::istringstream ss(
src);
363 while (std::getline(ss, token, sep)) {
364 if (!token.empty()) {
365 fields.push_back(token);
381 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
384 if (detlist.compare(unsetVal) == 0) {
388 std::vector<std::string> tokens =
splitString(detlist, separator);
391 for (
auto& token : tokens) {
392 ids.emplace_back(token.c_str());
395 isWhiteLister = doWhiteListing;
402 return ids.size() > 0;
408 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
409 return found == isWhiteLister;
413 std::vector<o2::detectors::DetID>
ids;
420 return DetFilterer(optionVal, unsetValue, separator,
true);
426 return DetFilterer(optionVal, unsetValue, separator,
false);
441 std::string dplProcessName =
whoAmI(configcontext);
444 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
453 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
458 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
464 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
468 const auto GRPTIMEKEY =
"GRPTIMESTART";
473 auto t = grp->getTimeStart();
479 LOG(fatal) <<
"Expected env value not found";
482 return boost::lexical_cast<uint64_t>(tstr);
492 getGRPStartTime(grp.get());
494 if (!hbfu.startTime) {
495 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
509 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
519 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
520 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
522 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
523 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
524 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
530 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
541 for (
auto&
f : filterers) {
554 std::stringstream
str;
555 str <<
"GRPDETKEY_" <<
id.getName();
556 if (
gIsMaster and grp.get() !=
nullptr) {
557 auto ok = grp->isDetReadOut(
id);
573 if (configcontext.
options().
get<
bool>(
"only-context")) {
577 auto accepted = accept(
id);
580 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
581 bool is_ingrp = isInGRPReadout(
id);
583 auto isRun = accepted && (forceAccepted || is_ingrp);
585 LOG(info) <<
id.getName()
586 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
587 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
588 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
589 <<
" is run? " << (isRun ?
"yes" :
"no");
594 std::vector<o2::detectors::DetID> detList;
597 std::vector<int> tpcsectors;
600 if (!helpasked && ismaster) {
604 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
606 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
609 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
610 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
612 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
614 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
615 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
617 if (!internalwrite) {
624 const int firstOtherChannel = 36;
625 int fanoutsize = firstOtherChannel;
636#ifdef ENABLE_UPGRADES
638 if (isEnabled(o2::detectors::DetID::IT3)) {
639 detList.emplace_back(o2::detectors::DetID::IT3);
647 if (isEnabled(o2::detectors::DetID::TRK)) {
648 detList.emplace_back(o2::detectors::DetID::TRK);
667 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
668 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
682 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
702 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
703 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
734 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
735 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
736 if (!disableTrapSim) {
802 if (!configcontext.
options().
get<
bool>(
"only-context")) {
806 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
808 for (
auto& s : digitizerSpecs) {
811 for (
auto& s : writerSpecs) {
815 std::vector<DataProcessorSpec> remaining;
816 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
817 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
818 for (
auto& s : remaining) {
828 bool haveCCDBInputSpec =
false;
829 for (
auto spec : specs) {
830 for (
auto in : spec.inputs) {
831 if (in.lifetime == Lifetime::Condition) {
832 haveCCDBInputSpec =
true;
837 if (!haveCCDBInputSpec && specs.size() > 0) {
838 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
839 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
844 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
845 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void initTPC(long timestamp)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp)
DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getTRKDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getTRKDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"