12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
118#include <type_traits>
131void customize(std::vector<o2::framework::CompletionPolicy>& policies)
144void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
147 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
148 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
149 workflowOptions.push_back(
150 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
152 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
154 workflowOptions.push_back(
155 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
157 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
158 workflowOptions.push_back(
161 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
162 workflowOptions.push_back(
168 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
169 workflowOptions.push_back(
170 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
172 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
176 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
177 workflowOptions.push_back(
178 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
185 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
186 workflowOptions.push_back(
190 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
191 workflowOptions.push_back(
192 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
193 workflowOptions.push_back(
194 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
216 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
217 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
219 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
222 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
225 workflowOptions.push_back(
ConfigParamSpec{
"store-ctp-lumi", VariantType::Float, -1.f, {
"store CTP lumi scaler in CTP digits (if >= 0)"}});
228void customize(std::vector<o2::framework::DispatchPolicy>& policies)
232 auto matcher = [](
auto const& spec) {
233 return spec.name ==
"SimReader";
235 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
241 const auto offset = int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
242 const auto increment = int64_t(hbfu.nHBFPerTF);
243 const auto startTime = hbfu.startTime;
244 const auto orbitFirst = hbfu.orbitFirst;
250void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
262 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
263 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
264 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
279 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
282 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
287 return std::min(lanes, (
int)sectors.size());
299 std::stringstream streamthis;
300 std::stringstream streamparent;
302 streamthis <<
"TPCGEMINIT_PID" << getpid();
303 streamparent <<
"TPCGEMINIT_PID" << getppid();
304 if (getenv(streamparent.str().c_str())) {
305 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
309 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
310 setenv(streamthis.str().c_str(),
"ON", 1);
313 cdb.setUseDefaults();
318 LOGP(info,
"initTPC: TPC GEM param, Gas param + CalPadGainFull updated for time {}", timestamp);
334 std::stringstream
str;
335 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
336 LOG(info) <<
"Publishing master key " <<
str.str();
337 setenv(
str.str().c_str(),
value, 1);
343 std::stringstream
str;
344 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
346 return getenv(
str.str().c_str());
349std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
353 LOG(error) <<
"This workflow needs a valid GRP file to start";
359 return std::shared_ptr<o2::parameters::GRPObject>(grp);
367 std::vector<std::string> fields;
369 std::istringstream ss(
src);
371 while (std::getline(ss, token, sep)) {
372 if (!token.empty()) {
373 fields.push_back(token);
389 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
392 if (detlist.compare(unsetVal) == 0) {
396 std::vector<std::string> tokens =
splitString(detlist, separator);
399 for (
auto& token : tokens) {
400 ids.emplace_back(token.c_str());
403 isWhiteLister = doWhiteListing;
410 return ids.size() > 0;
416 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
417 return found == isWhiteLister;
421 std::vector<o2::detectors::DetID>
ids;
428 return DetFilterer(optionVal, unsetValue, separator,
true);
434 return DetFilterer(optionVal, unsetValue, separator,
false);
449 std::string dplProcessName =
whoAmI(configcontext);
452 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
461 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
466 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
472 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
476 const auto GRPTIMEKEY =
"GRPTIMESTART";
481 auto t = grp->getTimeStart();
487 LOG(fatal) <<
"Expected env value not found";
490 return boost::lexical_cast<uint64_t>(tstr);
500 getGRPStartTime(grp.get());
502 if (!hbfu.startTime) {
503 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
517 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
527 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
528 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
530 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
531 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
532 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
538 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
549 for (
auto&
f : filterers) {
562 std::stringstream
str;
563 str <<
"GRPDETKEY_" <<
id.getName();
564 if (
gIsMaster and grp.get() !=
nullptr) {
565 auto ok = grp->isDetReadOut(
id);
581 if (configcontext.
options().
get<
bool>(
"only-context")) {
585 auto accepted = accept(
id);
588 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
589 bool is_ingrp = isInGRPReadout(
id);
591 auto isRun = accepted && (forceAccepted || is_ingrp);
593 LOG(info) <<
id.getName()
594 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
595 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
596 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
597 <<
" is run? " << (isRun ?
"yes" :
"no");
602 std::vector<o2::detectors::DetID> detList;
605 std::vector<int> tpcsectors;
608 if (!helpasked && ismaster) {
612 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
614 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
617 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
618 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
620 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
622 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
623 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
625 if (!internalwrite) {
632 const int firstOtherChannel = 36;
633 int fanoutsize = firstOtherChannel;
644#ifdef ENABLE_UPGRADES
646 if (isEnabled(o2::detectors::DetID::IT3)) {
647 detList.emplace_back(o2::detectors::DetID::IT3);
655 if (isEnabled(o2::detectors::DetID::TRK)) {
656 detList.emplace_back(o2::detectors::DetID::TRK);
675 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
676 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
690 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
710 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
711 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
742 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
743 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
744 if (!disableTrapSim) {
804 float lumiScaler = configcontext.
options().
get<
float>(
"store-ctp-lumi");
811 if (!configcontext.
options().
get<
bool>(
"only-context")) {
815 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
817 for (
auto& s : digitizerSpecs) {
820 for (
auto& s : writerSpecs) {
824 std::vector<DataProcessorSpec> remaining;
825 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
826 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
827 for (
auto& s : remaining) {
837 bool haveCCDBInputSpec =
false;
838 for (
auto spec : specs) {
839 for (
auto in : spec.inputs) {
840 if (in.lifetime == Lifetime::Condition) {
841 haveCCDBInputSpec =
true;
846 if (!haveCCDBInputSpec && specs.size() > 0) {
847 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
848 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
853 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
854 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void initTPC(long timestamp)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
void printKeyValues(bool showProv=true, bool useLogger=false, bool withPadding=true, bool showHash=true) const final
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, float ctpLumiScaler, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
o2::framework::DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
@ CalPadGainFull
Full pad gain calibration.
@ ParGas
Parameter for Gas.
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getTRKDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getTRKDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"