11#ifndef FRAMEWORK_RUN_DATA_PROCESSING_H
12#define FRAMEWORK_RUN_DATA_PROCESSING_H
14#include <fmt/format.h>
37using Inputs = std::vector<InputSpec>;
38using Outputs = std::vector<OutputSpec>;
39using Options = std::vector<ConfigParamSpec>;
71 o2::framework::call_if_defined<struct WorkflowOptions>([&](
auto*
ptr) {
72 ptr =
new std::decay_t<
decltype(*ptr)>;
82 if (services.empty()) {
115class ConfigParamRegistry;
116class ConfigParamSpec;
129 std::vector<o2::framework::ChannelConfigurationPolicy>
const& channelPolicies,
130 std::vector<o2::framework::CompletionPolicy>
const& completionPolicies,
131 std::vector<o2::framework::DispatchPolicy>
const& dispatchPolicies,
132 std::vector<o2::framework::ResourcePolicy>
const& resourcePolicies,
133 std::vector<o2::framework::CallbacksPolicy>
const& callbacksPolicies,
134 std::vector<o2::framework::SendingPolicy>
const& sendingPolicies,
135 std::vector<o2::framework::ConfigParamSpec>
const& workflowOptions,
136 std::vector<o2::framework::ConfigParamSpec>
const& detectedOptions,
142 requires requires(T& policy) { { T::createDefaultPolicies() } -> std::same_as<std::vector<T>>; }
145 std::vector<T> policies;
147 auto defaultPolicies = T::createDefaultPolicies();
148 policies.insert(std::end(policies), std::begin(defaultPolicies), std::end(defaultPolicies));
153 requires requires(
T& hook) {
customize(hook); }
172 std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
173 std::vector<o2::framework::ConfigParamSpec>& extraOptions,
int argc,
char** argv);
181 std::vector<o2::framework::ConfigParamSpec> workflowOptions;
186 std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
187 std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
188 std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
189 std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
190 std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();
192 std::unique_ptr<ServiceRegistry> configRegistry =
createRegistry();
193 std::vector<ConfigParamSpec> extraOptions;
194 std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{
nullptr};
195 auto configContext =
createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);
199 for (
auto& spec : specs) {
202 std::vector<ChannelConfigurationPolicy> channelPolicies;
204 auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
205 channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
206 return doMain(argc, argv, specs,
207 channelPolicies, completionPolicies, dispatchPolicies,
208 resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
211int callMain(
int argc,
char** argv,
int (*)(
int,
char**));
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
Defining PrimaryVertex explicitly as messageable.
auto homogeneous_apply_refs(L l, T &&object)
std::function< void(const char *)> OnWorkflowTerminationHook
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void overrideLabels(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
Helper used to add labels to Data Processors.
int callMain(int argc, char **argv, int(*)(int, char **))
void overrideAll(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
int doMain(int argc, char **argv, o2::framework::WorkflowSpec const &specs, std::vector< o2::framework::ChannelConfigurationPolicy > const &channelPolicies, std::vector< o2::framework::CompletionPolicy > const &completionPolicies, std::vector< o2::framework::DispatchPolicy > const &dispatchPolicies, std::vector< o2::framework::ResourcePolicy > const &resourcePolicies, std::vector< o2::framework::CallbacksPolicy > const &callbacksPolicies, std::vector< o2::framework::SendingPolicy > const &sendingPolicies, std::vector< o2::framework::ConfigParamSpec > const &workflowOptions, std::vector< o2::framework::ConfigParamSpec > const &detectedOptions, o2::framework::ConfigContext &configContext)
std::vector< T > injectCustomizations()
void overrideCloning(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
Helper used to customize a workflow via a template data processor.
void defaultConfiguration(std::vector< o2::framework::ConfigParamSpec > &globalWorkflowOptions)
void overridePipeline(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
Helper used to customize a workflow pipelining options.
int mainNoCatch(int argc, char **argv)
void doDefaultWorkflowTerminationHook()
o2::framework::ConfigContext createConfigContext(std::unique_ptr< o2::framework::ConfigParamRegistry > &workflowOptionsRegistry, o2::framework::ServiceRegistry &configRegistry, std::vector< o2::framework::ConfigParamSpec > &workflowOptions, std::vector< o2::framework::ConfigParamSpec > &extraOptions, int argc, char **argv)
char * getIdString(int argc, char **argv)
std::unique_ptr< o2::framework::ServiceRegistry > createRegistry()
void callWorkflowTermination(T &hook, char const *idstring)
o2::framework::WorkflowSpec defineDataProcessing(o2::framework::ConfigContext const &context)
This function hooks up the the workflow specifications into the DPL driver.
std::vector< o2::framework::ConfigParamSpec > requiredWorkflowOptions()
Workflow options which are required by DPL in order to work.
static auto userDefinedCustomization(auto &) -> void
static auto userDefinedCustomization(WithNonTrivialDefault auto &something) -> void
static auto userDefinedCustomization(WithUserOverride auto &something) -> void
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static bool appendOption(std::vector< ConfigParamSpec > &options, Configurable< T, K, IP > &what)