12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
118#include <type_traits>
131void customize(std::vector<o2::framework::CompletionPolicy>& policies)
144void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
147 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
148 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
149 workflowOptions.push_back(
150 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
152 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
154 workflowOptions.push_back(
155 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
157 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
158 workflowOptions.push_back(
161 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
162 workflowOptions.push_back(
168 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
169 workflowOptions.push_back(
170 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
172 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
176 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
177 workflowOptions.push_back(
178 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
185 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
186 workflowOptions.push_back(
190 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
191 workflowOptions.push_back(
192 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
193 workflowOptions.push_back(
194 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
216 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
217 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
219 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
222 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
225 workflowOptions.push_back(
ConfigParamSpec{
"store-ctp-lumi", VariantType::Float, -1.f, {
"store CTP lumi scaler in CTP digits (if >= 0)"}});
228void customize(std::vector<o2::framework::DispatchPolicy>& policies)
232 auto matcher = [](
auto const& spec) {
233 return spec.name ==
"SimReader";
235 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
241 const auto offset = int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
242 const auto increment = int64_t(hbfu.nHBFPerTF);
243 const auto startTime = hbfu.startTime;
244 const auto orbitFirst = hbfu.orbitFirst;
250void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
262 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
263 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
264 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
279 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
282 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
287 return std::min(lanes, (
int)sectors.size());
299 std::stringstream streamthis;
300 std::stringstream streamparent;
302 streamthis <<
"TPCGEMINIT_PID" << getpid();
303 streamparent <<
"TPCGEMINIT_PID" << getppid();
304 if (getenv(streamparent.str().c_str())) {
305 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
309 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
310 setenv(streamthis.str().c_str(),
"ON", 1);
313 cdb.setUseDefaults();
318 LOGP(info,
"initTPC: TPC GEM param, Gas param + CalPadGainFull updated for time {}", timestamp);
334 std::stringstream
str;
335 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
336 LOG(info) <<
"Publishing master key " <<
str.str();
337 setenv(
str.str().c_str(),
value, 1);
343 std::stringstream
str;
344 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
346 return getenv(
str.str().c_str());
349std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
353 LOG(error) <<
"This workflow needs a valid GRP file to start";
359 return std::shared_ptr<o2::parameters::GRPObject>(grp);
367 std::vector<std::string>
fields;
369 std::istringstream ss(
src);
371 while (std::getline(ss, token, sep)) {
372 if (!token.empty()) {
389 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
392 if (detlist.compare(unsetVal) == 0) {
396 std::vector<std::string> tokens =
splitString(detlist, separator);
399 for (
auto& token : tokens) {
400 ids.emplace_back(token.c_str());
403 isWhiteLister = doWhiteListing;
410 return ids.size() > 0;
416 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
417 return found == isWhiteLister;
421 std::vector<o2::detectors::DetID>
ids;
428 return DetFilterer(optionVal, unsetValue, separator,
true);
434 return DetFilterer(optionVal, unsetValue, separator,
false);
449 std::string dplProcessName =
whoAmI(configcontext);
452 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
461 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
466 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
472 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
476 const auto GRPTIMEKEY =
"GRPTIMESTART";
481 auto t = grp->getTimeStart();
487 LOG(fatal) <<
"Expected env value not found";
490 return boost::lexical_cast<uint64_t>(tstr);
500 getGRPStartTime(grp.get());
502 if (!hbfu.startTime) {
503 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
517 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
527 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
528 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
530 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
531 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
532 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
538 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
549 for (
auto&
f : filterers) {
562 std::stringstream
str;
563 str <<
"GRPDETKEY_" <<
id.getName();
564 if (
gIsMaster and grp.get() !=
nullptr) {
565 auto ok = grp->isDetReadOut(
id);
581 if (configcontext.
options().
get<
bool>(
"only-context")) {
585 auto accepted = accept(
id);
588 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
589 bool is_ingrp = isInGRPReadout(
id);
591 auto isRun = accepted && (forceAccepted || is_ingrp);
593 LOG(info) <<
id.getName()
594 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
595 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
596 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
597 <<
" is run? " << (isRun ?
"yes" :
"no");
602 std::vector<o2::detectors::DetID> detList;
605 std::vector<int> tpcsectors;
608 if (!helpasked && ismaster) {
612 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
614 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
617 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
618 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
620 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
622 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
623 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
625 if (!internalwrite) {
632 const int firstOtherChannel = 36;
633 int fanoutsize = firstOtherChannel;
644#ifdef ENABLE_UPGRADES
646 if (isEnabled(o2::detectors::DetID::IT3)) {
647 detList.emplace_back(o2::detectors::DetID::IT3);
655 if (isEnabled(o2::detectors::DetID::TRK)) {
656 detList.emplace_back(o2::detectors::DetID::TRK);
675 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
676 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
690 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
710 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
711 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
742 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
743 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
744 if (!disableTrapSim) {
804 float lumiScaler = configcontext.
options().
get<
float>(
"store-ctp-lumi");
811 if (!configcontext.
options().
get<
bool>(
"only-context")) {
815 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
817 for (
auto& s : digitizerSpecs) {
820 for (
auto& s : writerSpecs) {
824 std::vector<DataProcessorSpec> remaining;
825 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
826 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
827 for (
auto& s : remaining) {
837 bool haveCCDBInputSpec =
false;
838 for (
auto spec : specs) {
839 for (
auto in : spec.inputs) {
840 if (in.lifetime == Lifetime::Condition) {
841 haveCCDBInputSpec =
true;
846 if (!haveCCDBInputSpec && specs.size() > 0) {
847 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
848 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
853 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
854 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
std::vector< std::shared_ptr< arrow::Field > > fields
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void initTPC(long timestamp)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
void printKeyValues(bool showProv=true, bool useLogger=false, bool withPadding=true, bool showHash=true) const final
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, float ctpLumiScaler, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp)
const std::unordered_map< CDBType, const std::string > CDBTypeMap
Storage name in CCDB for each calibration and parameter type.
o2::framework::DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
@ CalPadGainFull
Full pad gain calibration.
@ ParGas
Parameter for Gas.
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getTRKDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getTRKDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"