12#include <boost/program_options.hpp>
13#include <boost/lexical_cast.hpp>
118#include <type_traits>
131void customize(std::vector<o2::framework::CompletionPolicy>& policies)
144void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
147 int defaultlanes = std::max(1u, std::thread::hardware_concurrency() / 2);
148 std::string laneshelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
149 workflowOptions.push_back(
150 ConfigParamSpec{
"tpc-lanes", VariantType::Int, defaultlanes, {laneshelp}});
152 std::string sectorshelp(
"List of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15");
154 workflowOptions.push_back(
155 ConfigParamSpec{
"tpc-sectors", VariantType::String, sectorDefault.c_str(), {sectorshelp}});
157 std::string onlyhelp(
"Comma separated list of detectors to accept. Takes precedence over the skipDet option. (Default is none)");
158 workflowOptions.push_back(
161 std::string skiphelp(
"Comma separate list of detectors to skip/ignore. (Default is none)");
162 workflowOptions.push_back(
168 std::string forceaccepthelp(
"Whether or not to always rely on accept/skip filters for detectors, independent of GRP content");
169 workflowOptions.push_back(
170 ConfigParamSpec{
"forceSelectedDets", VariantType::Bool,
false, {forceaccepthelp}});
172 std::string onlyctxhelp(
"Produce only the digitization context; Don't actually digitize");
176 std::string tpcrthelp(
"deprecated option, please connect workflows on the command line by pipe");
177 workflowOptions.push_back(
178 ConfigParamSpec{
"tpc-reco-type", VariantType::String,
"", {tpcrthelp}});
185 std::string simhelp(
"Comma separated list of simulation prefixes (for background, signal productions)");
186 workflowOptions.push_back(
190 std::string keyvaluehelp(
"Semicolon separated key=value strings (e.g.: 'TPC.gasDensity=1;...')");
191 workflowOptions.push_back(
192 ConfigParamSpec{
"configKeyValues", VariantType::String,
"", {keyvaluehelp}});
193 workflowOptions.push_back(
194 ConfigParamSpec{
"configFile", VariantType::String,
"", {
"configuration file for configurable parameters"}});
216 workflowOptions.push_back(
ConfigParamSpec{
"disable-trd-trapsim", VariantType::Bool,
false, {
"disable the trap simulation of the TRD"}});
217 workflowOptions.push_back(
ConfigParamSpec{
"trd-digit-downscaling", VariantType::Int, 1, {
"only keep TRD digits for every n-th trigger"}});
219 workflowOptions.push_back(
ConfigParamSpec{
"combine-devices", VariantType::Bool,
false, {
"combined multiple DPL worker/writer devices"}});
222 workflowOptions.push_back(
ConfigParamSpec{
"with-trigger", VariantType::Bool,
false, {
"enable distribution of CTP trigger digits"}});
225void customize(std::vector<o2::framework::DispatchPolicy>& policies)
229 auto matcher = [](
auto const& spec) {
230 return spec.name ==
"SimReader";
232 policies.push_back({
"prompt-for-simreader", matcher, DispatchOp::WhenReady});
238 const auto offset = int64_t(hbfu.getFirstIRofTF({0, hbfu.orbitFirstSampled}).orbit);
239 const auto increment = int64_t(hbfu.nHBFPerTF);
240 const auto startTime = hbfu.startTime;
241 const auto orbitFirst = hbfu.orbitFirst;
247void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
259 LOG(info) <<
"Setting DPL-header firstTForbit to " << dh.firstTForbit;
260 LOG(info) <<
"Setting DPL-header runNumber to " << dh.runNumber;
261 LOG(info) <<
"Setting DPL-header timeframe creation time to " << dph.creation;
276 auto lanes = configcontext.
options().
get<
int>(
"tpc-lanes");
279 LOG(fatal) <<
"tpc-lanes needs to be positive\n";
284 return std::min(lanes, (
int)sectors.size());
296 std::stringstream streamthis;
297 std::stringstream streamparent;
299 streamthis <<
"TPCGEMINIT_PID" << getpid();
300 streamparent <<
"TPCGEMINIT_PID" << getppid();
301 if (getenv(streamparent.str().c_str())) {
302 LOG(
debug) <<
"GEM ALREADY INITIALIZED ... SKIPPING HERE";
306 LOG(
debug) <<
"INITIALIZING TPC GEMAmplification";
307 setenv(streamthis.str().c_str(),
"ON", 1);
310 cdb.setUseDefaults();
320 std::stringstream
str;
321 str <<
"O2SIMDIGIINTERNAL_" << getpid() <<
"_" <<
key;
322 LOG(info) <<
"Publishing master key " <<
str.str();
323 setenv(
str.str().c_str(),
value, 1);
329 std::stringstream
str;
330 str <<
"O2SIMDIGIINTERNAL_" << getppid() <<
"_" <<
key;
332 return getenv(
str.str().c_str());
335std::shared_ptr<o2::parameters::GRPObject>
readGRP(std::string
const& inputGRP)
339 LOG(error) <<
"This workflow needs a valid GRP file to start";
345 return std::shared_ptr<o2::parameters::GRPObject>(grp);
353 std::vector<std::string> fields;
355 std::istringstream ss(
src);
357 while (std::getline(ss, token, sep)) {
358 if (!token.empty()) {
359 fields.push_back(token);
375 DetFilterer(std::string
const& detlist, std::string
const& unsetVal,
char separator,
bool doWhiteListing)
378 if (detlist.compare(unsetVal) == 0) {
382 std::vector<std::string> tokens =
splitString(detlist, separator);
385 for (
auto& token : tokens) {
386 ids.emplace_back(token.c_str());
389 isWhiteLister = doWhiteListing;
396 return ids.size() > 0;
402 bool found = std::find(
ids.begin(),
ids.end(),
id) !=
ids.end();
403 return found == isWhiteLister;
407 std::vector<o2::detectors::DetID>
ids;
414 return DetFilterer(optionVal, unsetValue, separator,
true);
420 return DetFilterer(optionVal, unsetValue, separator,
false);
435 std::string dplProcessName =
whoAmI(configcontext);
438 bool initServices = !isDPLinternal && !isDumpWorkflow && !ismaster;
447 ConfigurableParam::updateFromFile(configcontext.
options().
get<std::string>(
"configFile"));
452 ConfigurableParam::updateFromString(configcontext.
options().
get<std::string>(
"configKeyValues"));
458 std::shared_ptr<o2::parameters::GRPObject const> grp(
nullptr);
462 const auto GRPTIMEKEY =
"GRPTIMESTART";
467 auto t = grp->getTimeStart();
473 LOG(fatal) <<
"Expected env value not found";
476 return boost::lexical_cast<uint64_t>(tstr);
486 getGRPStartTime(grp.get());
488 if (!hbfu.startTime) {
489 hbfu.setValue(
"HBFUtils.startTime",
std::to_string(getGRPStartTime(grp.get())));
503 LOG(info) <<
"Setting timestamp of BasicCCDBManager to " << dph.
creation;
513 ConfigurableParam::setValue(
"DigiParams.digitizationgeometry_prefix", simPrefixes[0]);
514 ConfigurableParam::setValue(
"DigiParams.grpfile", grpfile);
516 LOG(info) <<
"MC-TRUTH " << !configcontext.
options().
get<
bool>(
"disable-mc");
517 bool mctruth = !configcontext.
options().
get<
bool>(
"disable-mc");
518 ConfigurableParam::setValue(
"DigiParams",
"mctruth", mctruth);
524 if (!configcontext.
options().
get<
bool>(
"disable-write-ini")) {
535 for (
auto&
f : filterers) {
548 std::stringstream
str;
549 str <<
"GRPDETKEY_" <<
id.getName();
550 if (
gIsMaster and grp.get() !=
nullptr) {
551 auto ok = grp->isDetReadOut(
id);
567 if (configcontext.
options().
get<
bool>(
"only-context")) {
571 auto accepted = accept(
id);
574 auto forceAccepted = configcontext.
options().
get<
bool>(
"forceSelectedDets");
575 bool is_ingrp = isInGRPReadout(
id);
577 auto isRun = accepted && (forceAccepted || is_ingrp);
579 LOG(info) <<
id.getName()
580 <<
" is in grp? " << (is_ingrp ?
"yes" :
"no") <<
";"
581 <<
" is taken although not in grp? " << (!is_ingrp && (accepted && forceAccepted) ?
"yes" :
"no") <<
";"
582 <<
" is skipped? " << (!accepted ?
"yes" :
"no") <<
";"
583 <<
" is run? " << (isRun ?
"yes" :
"no");
588 std::vector<o2::detectors::DetID> detList;
591 std::vector<int> tpcsectors;
594 if (!helpasked && ismaster) {
598 tpcsectors = o2::RangeTokenizer::tokenize<int>(configcontext.
options().
get<std::string>(
"tpc-sectors"));
600 auto lanes = helpasked ? 1 :
getNumTPCLanes(tpcsectors, configcontext);
603 auto internalwrite = configcontext.
options().
get<
bool>(
"tpc-chunked-writer");
604 auto distortionType = configcontext.
options().
get<
int>(
"tpc-distortion-type");
606 specs.insert(specs.end(), tpcPipelines.begin(), tpcPipelines.end());
608 if (configcontext.
options().
get<std::string>(
"tpc-reco-type").empty() ==
false) {
609 throw std::runtime_error(
"option 'tpc-reco-type' is deprecated, please connect workflows on the command line by pipe");
611 if (!internalwrite) {
618 const int firstOtherChannel = 36;
619 int fanoutsize = firstOtherChannel;
630#ifdef ENABLE_UPGRADES
632 if (isEnabled(o2::detectors::DetID::IT3)) {
633 detList.emplace_back(o2::detectors::DetID::IT3);
641 if (isEnabled(o2::detectors::DetID::TRK)) {
642 detList.emplace_back(o2::detectors::DetID::TRK);
661 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-tof");
662 useCCDB |= configcontext.
options().
get<
bool>(
"ccdb-tof-sa");
676 auto useCCDB = configcontext.
options().
get<
bool>(
"use-ccdb-ft0");
696 auto useCCDB = !configcontext.
options().
get<
bool>(
"no-use-ccdb-emc");
697 bool requireCTPInputs = !configcontext.
options().
get<
bool>(
"no-require-ctpinputs-emc");
728 auto disableTrapSim = configcontext.
options().
get<
bool>(
"disable-trd-trapsim");
729 auto trdDigitDownscaling = configcontext.
options().
get<
int>(
"trd-digit-downscaling");
730 if (!disableTrapSim) {
796 if (!configcontext.
options().
get<
bool>(
"only-context")) {
800 bool combine = configcontext.
options().
get<
bool>(
"combine-devices");
802 for (
auto& s : digitizerSpecs) {
805 for (
auto& s : writerSpecs) {
809 std::vector<DataProcessorSpec> remaining;
810 specs.push_back(
specCombiner(
"Digitizations", digitizerSpecs, remaining));
811 specs.push_back(
specCombiner(
"Writers", writerSpecs, remaining));
812 for (
auto& s : remaining) {
822 bool haveCCDBInputSpec =
false;
823 for (
auto spec : specs) {
824 for (
auto in : spec.inputs) {
825 if (in.lifetime == Lifetime::Condition) {
826 haveCCDBInputSpec =
true;
831 if (!haveCCDBInputSpec && specs.size() > 0) {
832 LOG(info) <<
"No one uses DPL CCDB .. injecting a dummy CCDB query into " << specs.back().name;
833 specs.back().inputs.emplace_back(
"_dummyOrbitReset",
"CTP",
"ORBITRESET", 0, Lifetime::Condition,
838 bool withTrigger = configcontext.
options().
get<
bool>(
"with-trigger");
839 LOG(info) <<
" TRIGGER " << withTrigger;
Simple interface to the CDB manager.
Definition of the GEM amplification.
Header of the General Run Parameters object.
Definition of the Names Generator class.
Helper function to tokenize sequences and ranges of integral numbers.
std::vector< std::string > splitString(std::string const &src, char sep)
const char * get_master_env(const char *key)
int getNumTPCLanes(std::vector< int > const §ors, ConfigContext const &configcontext)
DetFilterer whitelister(std::string optionVal, std::string unsetValue, char separator)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
This function hooks up the the workflow specifications into the DPL driver.
void setTimingInfoInHeaders(o2::header::DataHeader &dh, o2::framework::DataProcessingHeader &dph)
DetFilterer blacklister(std::string optionVal, std::string unsetValue, char separator)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
void publish_master_env(const char *key, const char *value)
std::shared_ptr< o2::parameters::GRPObject > readGRP(std::string const &inputGRP)
static std::string getGRPFileName(const std::string_view prefix=STANDARDSIMPREFIX)
static constexpr std::string_view DIGITIZATIONCONFIGFILE
static std::string getCCDBServer()
static BasicCCDBManager & instance()
void setCaching(bool v)
disable or enable caching
void setTimestamp(long t)
set timestamp cache for all queries
void setLocalObjectValidityChecking(bool v=true)
set the flag to check object validity before CCDB query
static const HBFUtils & Instance()
static void writeINI(std::string const &filename, std::string const &keyOnly="")
Static class with identifiers, bitmasks and names for ALICE detectors.
bool helpOnCommandLine() const
ConfigParamRegistry & options() const
T get(const char *key) const
static GRPObject * loadFrom(const std::string &grpFileName="")
static CDBInterface & instance()
static GEMAmplification & instance()
Default constructor.
static constexpr int MAXSECTOR
GLsizei const GLfloat * value
constexpr double LHCOrbitMUS
DataProcessorSpec getCPVDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
o2::framework::DataProcessorSpec getCPVDigitWriterSpec(bool mctruth=true)
framework::DataProcessorSpec getDigitWriterSpec(bool raw=true)
o2::framework::DataProcessorSpec getCTPDigitizerSpec(int channel, std::vector< o2::detectors::DetID > &detList, bool mctruth)
o2::framework::DataProcessorSpec getEMCALDigitWriterSpec(bool mctruth=true)
Create new digits writer spec.
o2::framework::DataProcessorSpec getEMCALDigitizerSpec(int channel, bool requireCTPInput, bool mctruth=true, bool useccdb=true)
Create new digitizer spec.
o2::framework::DataProcessorSpec getFDDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getFDDDigitWriterSpec(bool mctruth=true, bool trigInp=true)
Defining PrimaryVertex explicitly as messageable.
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
std::vector< DataProcessorSpec > WorkflowSpec
o2::framework::DataProcessorSpec getFT0DigitizerSpec(int channel, bool mctruth, bool useCCDB)
framework::DataProcessorSpec getFT0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getFV0DigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getFV0DigitWriterSpec(bool mctruth=true, bool trigInp=true)
create a processor spec
o2::framework::DataProcessorSpec getHMPIDDigitWriterSpec(bool mctruth=true)
o2::framework::DataProcessorSpec getHMPIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITS3DigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getITS3DigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getITSDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
o2::framework::DataProcessorSpec getMFTDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getMFTDigitizerSpec(int channel, bool mctruth)
DataProcessorSpec getITSDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMCHDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitWriterSpec(bool mctruth)
o2::framework::DataProcessorSpec getMIDDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getGRPUpdaterSpec(const std::string &prefix, const std::vector< o2::detectors::DetID > &detList)
create the processor spec
o2::framework::DataProcessorSpec getPHOSDigitWriterSpec(bool mctruth)
DataProcessorSpec getPHOSDigitizerSpec(int channel, bool mctruth)
Create new digitizer spec.
DataProcessorSpec getSimReaderSpec(SubspecRange range, const std::vector< std::string > &simprefixes, const std::vector< int > &tpcsectors, bool withTrigger)
o2::framework::DataProcessorSpec getTOFDigitWriterSpec(bool useMC=1, bool writeErr=0)
DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB, bool mctruth, std::string ccdb_url, int timestamp)
DataProcessorSpec getTPCDigitRootWriterSpec(std::vector< int > const &laneConfiguration, bool mctruth)
o2::framework::DataProcessorSpec getTPCDigitizerSpec(int channel, bool writeGRP, bool mctruth, bool internalwriter, int distortionType)
o2::framework::DataProcessorSpec getTRDDigitWriterSpec(bool mctruth=true, bool inpFromDigitizer=true)
o2::framework::DataProcessorSpec getTRDDigitizerSpec(int channel, bool mctruth=true)
o2::framework::DataProcessorSpec getTRDTrapSimulatorSpec(bool useMC, int digitDownscaling)
o2::framework::DataProcessorSpec getTRDTrackletWriterSpec(bool useMC)
o2::framework::DataProcessorSpec getTRKDigitWriterSpec(bool mctruth=true, bool dec=false, bool calib=false)
DataProcessorSpec getTRKDigitizerSpec(int channel, bool mctruth)
o2::framework::DataProcessorSpec getZDCDigitizerSpec(int channel, bool mctruth)
framework::DataProcessorSpec getZDCDigitWriterDPLSpec(bool mctruth, bool simVersion)
create a processor spec
std::string to_string(gsl::span< T, Size > span)
DetFilterer(std::string const &detlist, std::string const &unsetVal, char separator, bool doWhiteListing)
bool accept(o2::detectors::DetID id)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAnyWithAllConditions(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint64_t startTime
absolute time in ms corresponding to the start of the MC run
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"