Project
Loading...
Searching...
No Matches
runDataProcessing.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_RUN_DATA_PROCESSING_H
12#define FRAMEWORK_RUN_DATA_PROCESSING_H
13
14#include <fmt/format.h>
29#include "Framework/Logger.h"
32#include "ResourcePolicy.h"
33#include <vector>
34
35namespace o2::framework
36{
37using Inputs = std::vector<InputSpec>;
38using Outputs = std::vector<OutputSpec>;
39using Options = std::vector<ConfigParamSpec>;
40} // namespace o2::framework
41
51
52// This template magic allow users to customize the behavior of the process
53// by (optionally) implementing a `configure` method which modifies one of the
54// objects in question.
55//
56// For example it can be optionally implemented by the user to specify the
57// channel policies for your setup. Use this if you want to customize the way
58// your devices communicate between themself, e.g. if you want to use REQ/REP
59// in place of PUB/SUB.
60//
61// The advantage of this approach is that we do not need to expose the
62// configurability / configuration object to the user, unless he really wants to
63// modify it. The drawback is that we need to declare the `customize` method
64// before include this file.
65
66// By default we leave the channel policies unchanged. Notice that the default still include
67// a "match all" policy which uses pub / sub
68
69void defaultConfiguration(std::vector<o2::framework::ConfigParamSpec>& globalWorkflowOptions)
70{
71 o2::framework::call_if_defined<struct WorkflowOptions>([&](auto* ptr) {
72 ptr = new std::decay_t<decltype(*ptr)>;
73 o2::framework::homogeneous_apply_refs([&globalWorkflowOptions](auto what) {
74 return o2::framework::ConfigurableHelpers::appendOption(globalWorkflowOptions, what);
75 },
76 *ptr);
77 });
78}
79
80void defaultConfiguration(std::vector<o2::framework::ServiceSpec>& services)
81{
82 if (services.empty()) {
84 }
85}
86
88std::vector<o2::framework::ConfigParamSpec> requiredWorkflowOptions();
89
90template <typename T>
91concept WithUserOverride = requires(T& something) { customize(something); };
92
93template <typename T>
94concept WithNonTrivialDefault = !WithUserOverride<T> && requires(T& something) { defaultConfiguration(something); };
95
97 static auto userDefinedCustomization(WithUserOverride auto& something) -> void
98 {
99 customize(something);
100 }
101
102 static auto userDefinedCustomization(WithNonTrivialDefault auto& something) -> void
103 {
104 defaultConfiguration(something);
105 }
106
107 static auto userDefinedCustomization(auto&) -> void
108 {
109 }
110};
111
112namespace o2::framework
113{
114class ConfigContext;
115class ConfigParamRegistry;
116class ConfigParamSpec;
117} // namespace o2::framework
119void overridePipeline(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
120
122void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
123
125void overrideLabels(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
126
127// This comes from the framework itself. This way we avoid code duplication.
128int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
129 std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
130 std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
131 std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
132 std::vector<o2::framework::ResourcePolicy> const& resourcePolicies,
133 std::vector<o2::framework::CallbacksPolicy> const& callbacksPolicies,
134 std::vector<o2::framework::SendingPolicy> const& sendingPolicies,
135 std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
136 std::vector<o2::framework::ConfigParamSpec> const& detectedOptions,
137 o2::framework::ConfigContext& configContext);
138
140
141template <typename T>
142 requires requires(T& policy) { { T::createDefaultPolicies() } -> std::same_as<std::vector<T>>; }
143std::vector<T> injectCustomizations()
144{
145 std::vector<T> policies;
147 auto defaultPolicies = T::createDefaultPolicies();
148 policies.insert(std::end(policies), std::begin(defaultPolicies), std::end(defaultPolicies));
149 return policies;
150}
151
152template <typename T>
153 requires requires(T& hook) { customize(hook); }
154void callWorkflowTermination(T& hook, char const* idstring)
155{
156 customize(hook);
157 hook(idstring);
159}
160
161// Do not call the user hook if it's not there.
162template <typename T>
163void callWorkflowTermination(T&, char const* idstring)
164{
166}
167
168void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
169
170o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
171 o2::framework::ServiceRegistry& configRegistry,
172 std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
173 std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);
174
175std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();
176
177int mainNoCatch(int argc, char** argv)
178{
179 using namespace o2::framework;
180
181 std::vector<o2::framework::ConfigParamSpec> workflowOptions;
183 auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
184 workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));
185
186 std::vector<CompletionPolicy> completionPolicies = injectCustomizations<CompletionPolicy>();
187 std::vector<DispatchPolicy> dispatchPolicies = injectCustomizations<DispatchPolicy>();
188 std::vector<ResourcePolicy> resourcePolicies = injectCustomizations<ResourcePolicy>();
189 std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
190 std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();
191
192 std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
193 std::vector<ConfigParamSpec> extraOptions;
194 std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
195 auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);
196
198 overrideAll(configContext, specs);
199 for (auto& spec : specs) {
201 }
202 std::vector<ChannelConfigurationPolicy> channelPolicies;
204 auto defaultChannelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(configContext);
205 channelPolicies.insert(std::end(channelPolicies), std::begin(defaultChannelPolicies), std::end(defaultChannelPolicies));
206 return doMain(argc, argv, specs,
207 channelPolicies, completionPolicies, dispatchPolicies,
208 resourcePolicies, callbacksPolicies, sendingPolicies, workflowOptions, extraOptions, configContext);
209}
210
211int callMain(int argc, char** argv, int (*)(int, char**));
212char* getIdString(int argc, char** argv);
213
214int main(int argc, char** argv)
215{
216 using namespace o2::framework;
217
218 int result = callMain(argc, argv, mainNoCatch);
219
220 char* idstring = getIdString(argc, argv);
221 o2::framework::OnWorkflowTerminationHook onWorkflowTerminationHook;
222 callWorkflowTermination(onWorkflowTerminationHook, idstring);
223
224 return result;
225}
226#endif
TBranch * ptr
void customize(std::vector< o2::framework::CallbacksPolicy > &policies)
GLuint64EXT * result
Definition glcorearb.h:5662
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto homogeneous_apply_refs(L l, T &&object)
std::function< void(const char *)> OnWorkflowTerminationHook
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void overrideLabels(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
Helper used to add labels to Data Processors.
int callMain(int argc, char **argv, int(*)(int, char **))
void overrideAll(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
int doMain(int argc, char **argv, o2::framework::WorkflowSpec const &specs, std::vector< o2::framework::ChannelConfigurationPolicy > const &channelPolicies, std::vector< o2::framework::CompletionPolicy > const &completionPolicies, std::vector< o2::framework::DispatchPolicy > const &dispatchPolicies, std::vector< o2::framework::ResourcePolicy > const &resourcePolicies, std::vector< o2::framework::CallbacksPolicy > const &callbacksPolicies, std::vector< o2::framework::SendingPolicy > const &sendingPolicies, std::vector< o2::framework::ConfigParamSpec > const &workflowOptions, std::vector< o2::framework::ConfigParamSpec > const &detectedOptions, o2::framework::ConfigContext &configContext)
std::vector< T > injectCustomizations()
void overrideCloning(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
Helper used to customize a workflow via a template data processor.
void defaultConfiguration(std::vector< o2::framework::ConfigParamSpec > &globalWorkflowOptions)
void overridePipeline(o2::framework::ConfigContext &ctx, std::vector< o2::framework::DataProcessorSpec > &workflow)
Helper used to customize a workflow pipelining options.
int mainNoCatch(int argc, char **argv)
void doDefaultWorkflowTerminationHook()
o2::framework::ConfigContext createConfigContext(std::unique_ptr< o2::framework::ConfigParamRegistry > &workflowOptionsRegistry, o2::framework::ServiceRegistry &configRegistry, std::vector< o2::framework::ConfigParamSpec > &workflowOptions, std::vector< o2::framework::ConfigParamSpec > &extraOptions, int argc, char **argv)
char * getIdString(int argc, char **argv)
std::unique_ptr< o2::framework::ServiceRegistry > createRegistry()
void callWorkflowTermination(T &hook, char const *idstring)
o2::framework::WorkflowSpec defineDataProcessing(o2::framework::ConfigContext const &context)
This function hooks up the the workflow specifications into the DPL driver.
std::vector< o2::framework::ConfigParamSpec > requiredWorkflowOptions()
Workflow options which are required by DPL in order to work.
static auto userDefinedCustomization(auto &) -> void
static auto userDefinedCustomization(WithNonTrivialDefault auto &something) -> void
static auto userDefinedCustomization(WithUserOverride auto &something) -> void
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static bool appendOption(std::vector< ConfigParamSpec > &options, Configurable< T, K, IP > &what)
#define main