12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
118#include <type_traits>
131void customize(std::vector<o2::framework::CompletionPolicy>& policies)
144void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
147 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
148 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
149 workflowOptions.push_back(
150 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
152 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
154 workflowOptions.push_back(
155 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
157 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
158 workflowOptions.push_back(
161 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
162 workflowOptions.push_back(
168 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
169 workflowOptions.push_back(
170 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
172 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
176 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
177 workflowOptions.push_back(
178 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
185 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
186 workflowOptions.push_back(
190 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
191 workflowOptions.push_back(
192 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
193 workflowOptions.push_back(
194 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
216 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
217 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
219 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
222 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
225void customize(std::vector<o2::framework::DispatchPolicy>& policies)
229 auto matcher = [](
auto const& spec) {
230 return spec.name ==
"SimReader";
232 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
238 const auto offset = int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
239 const auto increment = int64_t(hbfu.nHBFPerTF);
240 const auto startTime = hbfu.startTime;
241 const auto orbitFirst = hbfu.orbitFirst;
247void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
259 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
260 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
261 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
276 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
279 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
284 return std::min(lanes, (
int)sectors.size());
296 std::stringstream streamthis;
297 std::stringstream streamparent;
299 streamthis <<
"TPCGEMINIT_PID" << getpid();
300 streamparent <<
"TPCGEMINIT_PID" << getppid();
301 if (getenv(streamparent.str().c_str())) {
302 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
306 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
307 setenv(streamthis.str().c_str(),
"ON", 1);
310 cdb.setUseDefaults();
315 LOGP(info,
"initTPC: TPC GEM param, Gas param + CalPadGainFull updated for time {}", timestamp);
331 std::stringstream
str;
332 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
333 LOG(info) <<
"Publishing master key " <<
str.str();
334 setenv(
str.str().c_str(),
value, 1);
340 std::stringstream
str;
341 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
343 return getenv(
str.str().c_str());
346std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
350 LOG(error) <<
"This workflow needs a valid GRP file to start";
356 return std::shared_ptr<o2::parameters::GRPObject>(grp);
364 std::vector<std::string> fields;
366 std::istringstream ss(
src);
368 while (std::getline(ss, token, sep)) {
369 if (!token.empty()) {
370 fields.push_back(token);
386 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
389 if (detlist.compare(unsetVal) == 0) {
393 std::vector<std::string> tokens =
splitString(detlist, separator);
396 for (
auto& token : tokens) {
397 ids.emplace_back(token.c_str());
400 isWhiteLister = doWhiteListing;
407 return ids.size() > 0;
413 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
414 return found == isWhiteLister;
418 std::vector<o2::detectors::DetID>
ids;
425 return DetFilterer(optionVal, unsetValue, separator,
true);
431 return DetFilterer(optionVal, unsetValue, separator,
false);
446 std::string dplProcessName =
whoAmI(configcontext);
449 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
458 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
463 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
469 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
473 const auto GRPTIMEKEY =
"GRPTIMESTART";
478 auto t = grp->getTimeStart();
484 LOG(fatal) <<
"Expected env value not found";
487 return boost::lexical_cast<uint64_t>(tstr);
497 getGRPStartTime(grp.get());
499 if (!hbfu.startTime) {
500 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
514 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
524 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
525 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
527 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
528 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
529 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
535 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
546 for (
auto&
f : filterers) {
559 std::stringstream
str;
560 str <<
"GRPDETKEY_" <<
id.getName();
561 if (
gIsMaster and grp.get() !=
nullptr) {
562 auto ok = grp->isDetReadOut(
id);
578 if (configcontext.
options().
get<
bool>(
"only-context")) {
582 auto accepted = accept(
id);
585 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
586 bool is_ingrp = isInGRPReadout(
id);
588 auto isRun = accepted && (forceAccepted || is_ingrp);
590 LOG(info) <<
id.getName()
591 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
592 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
593 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
594 <<
" is run? " << (isRun ?
"yes" :
"no");
599 std::vector<o2::detectors::DetID> detList;
602 std::vector<int> tpcsectors;
605 if (!helpasked && ismaster) {
609 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
611 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
614 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
615 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
617 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
619 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
620 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
622 if (!internalwrite) {
629 const int firstOtherChannel = 36;
630 int fanoutsize = firstOtherChannel;
641#ifdef ENABLE_UPGRADES
643 if (isEnabled(o2::detectors::DetID::IT3)) {
644 detList.emplace_back(o2::detectors::DetID::IT3);
652 if (isEnabled(o2::detectors::DetID::TRK)) {
653 detList.emplace_back(o2::detectors::DetID::TRK);
672 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
673 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
687 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
707 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
708 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
739 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
740 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
741 if (!disableTrapSim) {
807 if (!configcontext.
options().
get<
bool>(
"only-context")) {
811 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
813 for (
auto& s : digitizerSpecs) {
816 for (
auto& s : writerSpecs) {
820 std::vector<DataProcessorSpec> remaining;
821 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
822 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
823 for (
auto& s : remaining) {
833 bool haveCCDBInputSpec =
false;
834 for (
auto spec : specs) {
835 for (
auto in : spec.inputs) {
836 if (in.lifetime == Lifetime::Condition) {
837 haveCCDBInputSpec =
true;
842 if (!haveCCDBInputSpec && specs.size() > 0) {
843 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
844 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
849 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
850 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void initTPC(long timestamp)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
void printKeyValues(bool showProv=true, bool useLogger=false, bool withPadding=true, bool showHash=true) const final
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp)
DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
@ CalPadGainFull
Full pad gain calibration.
@ ParGas
Parameter for Gas.
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getTRKDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getTRKDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"