Project
Loading...
Searching...
No Matches
DPLWorkflowUtils.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// some user-space helpers/utilities for DPL workflowspecs
13
14//
15// Created by Sandro Wenzel on 19.04.22.
16//
17
18#ifndef O2_DPLWORKFLOWUTILS_H
19#define O2_DPLWORKFLOWUTILS_H
20
28#include "Framework/Task.h"
30#include <vector>
31#include <unordered_map>
32#include <iostream>
33#ifdef WITH_OPENMP
34#include <omp.h>
35#include "TROOT.h"
36#endif
37
38namespace o2
39{
40namespace framework
41{
42
43// Finding out if the current process is the master DPL driver process,
44// first setting up the topology. Might be important to know when we write
45// files (to prevent that multiple processes write the same file)
46bool isMasterWorkflowDefinition(ConfigContext const& configcontext)
47{
48 int argc = configcontext.argc();
49 auto argv = configcontext.argv();
50 bool ismaster = true;
51 for (int argi = 0; argi < argc; ++argi) {
52 // when channel-config is present it means that this is started as
53 // as FairMQDevice which means it is already a forked process
54 if (strcmp(argv[argi], "--channel-config") == 0) {
55 ismaster = false;
56 break;
57 }
58 }
59 return ismaster;
60}
61
62// Finding out if we merely want to dump the DPL workflow json file.
63// In this case we could avoid some computation/initialization, when
64// this doesn't influence the topology.
65bool isDumpWorkflowInvocation(ConfigContext const& configcontext)
66{
67 int argc = configcontext.argc();
68 auto argv = configcontext.argv();
69 bool isdump = false;
70 for (int argi = 0; argi < argc; ++argi) {
71 if (strcmp(argv[argi], "--dump-config") == 0) {
72 isdump = true;
73 break;
74 }
75 }
76 return isdump;
77}
78
79// find out if we are an internal DPL device
80// (given the device name)
81bool isInternalDPL(std::string const& name)
82{
83 if (name.find("internal-dpl-") != std::string::npos) {
84 return true;
85 }
86 return false;
87}
88
89// find out the name of THIS DPL device at runtime
90// May be useful to know to prevent certain initializations for
91// say internal DPL devices or writer devices
92std::string whoAmI(ConfigContext const& configcontext)
93{
94 int argc = configcontext.argc();
95 auto argv = configcontext.argv();
96 // the name of this device is the string following the --id field in
97 // the argv invocation
98 for (int argi = 0; argi < argc; ++argi) {
99 if (strcmp(argv[argi], "--id") == 0) {
100 if (argi + 1 < argc) {
101 return std::string(argv[argi + 1]);
102 }
103 }
104 }
105 return std::string("");
106}
107
108// a utility combining multiple specs into one
109// (with some checking that it makes sense)
110// spits out the combined spec (merged inputs/outputs and AlgorithmSpec)
111// Can put policies later whether to multi-thread or serialize internally etc.
112// fills a "remaining" spec container for things it can't simply merge
113// (later on we could do a full topological sort / spec minimization approach)
114o2::framework::DataProcessorSpec specCombiner(std::string const& name, std::vector<DataProcessorSpec> const& speccollection,
115 std::vector<DataProcessorSpec>& remaining)
116{
117 std::vector<OutputSpec> combinedOutputSpec;
118 std::vector<InputSpec> combinedInputSpec;
119 std::vector<ConfigParamSpec> combinedOptions;
120
121 // this maps an option key to one of multiple specs where this is used
122 // we will need to introduce unambiguous options in case an option is present multiple times
123 std::unordered_map<std::string, std::vector<std::pair<std::string, ConfigParamSpec>>> optionMerger;
124
125 // maps a process name to a map holder conversion of "namespaced-keys" to original "keys"
126 std::unordered_map<std::string, std::unordered_map<std::string, std::string>> optionsRemap;
127
128 // keep track of which input bindings are already used
129 // (should not have duplicates ... since devices may fetch data just based on the binding)
130 std::unordered_map<std::string, bool> inputBindings;
131
132 // we collect all outputs once ---> this is to check that none of the inputs matches
133 // an output
134 std::vector<OutputSpec> allOutputs;
135 std::vector<DataProcessorSpec> mergableSpecs;
136 for (auto& spec : speccollection) {
137 // merge output specs
138 for (auto& os : spec.outputs) {
139 allOutputs.push_back(os);
140 }
141 }
142
143 for (auto& spec : speccollection) {
144 auto& procname = spec.name;
145 optionsRemap[procname] = std::unordered_map<std::string, std::string>();
146
147 // merge input specs ... but only after we have verified that the spec does
148 // not depend in internal outputs
149 bool inputCheckOk = true;
150 for (auto& is : spec.inputs) {
151 // let's see if input is part of outputs
152 // ... in which case we can't easily merge the spec here
153 // ... and just neglect it for the moment
154 for (auto& o : allOutputs) {
155 if (DataSpecUtils::match(is, o)) {
156 std::cerr << "Found internal connection " << is << " ... will take out spec " << procname << " .. from merging process \n";
157 inputCheckOk = false;
158 break;
159 }
160 }
161 }
162 if (!inputCheckOk) {
163 remaining.push_back(spec);
164 // directly to next task
165 continue;
166 }
167 for (auto& is : spec.inputs) {
168 if (inputBindings.find(is.binding) != inputBindings.end()) {
169 // we can accept duplicate binding if it is bound to the same spec (e.g. ccdbParamSpec)
170 if (std::find(combinedInputSpec.begin(), combinedInputSpec.end(), is) == combinedInputSpec.end()) {
171 LOG(error) << "Found duplicate input binding with different spec.:" << is;
172 } else {
173 continue; // consider as already accounted
174 }
175 }
176 combinedInputSpec.push_back(is);
177 inputBindings[is.binding] = 1;
178 }
179 mergableSpecs.push_back(spec);
180
181 // merge output specs
182 for (auto& os : spec.outputs) {
183 combinedOutputSpec.push_back(os);
184 }
185 // merge options (part 1)
186 for (auto& opt : spec.options) {
187 auto optkey = opt.name;
188 auto iter = optionMerger.find(optkey);
189 auto procconfigpair = std::pair<std::string, ConfigParamSpec>(procname, opt);
190 if (iter == optionMerger.end()) {
191 optionMerger[optkey] = std::vector<std::pair<std::string, ConfigParamSpec>>();
192 }
193 optionMerger[optkey].push_back(procconfigpair);
194 }
195 }
196 // merge options (part 2)
197 for (auto& iter : optionMerger) {
198 if (iter.second.size() > 1) {
199 // std::cerr << "Option " << iter.first << " duplicated in multiple procs --> applying namespacing \n";
200 for (auto& part : iter.second) {
201 auto procname = part.first;
202 auto originalSpec = part.second;
203 auto namespaced_name = procname + "." + originalSpec.name;
204 combinedOptions.push_back(ConfigParamSpec{namespaced_name,
205 originalSpec.type, originalSpec.defaultValue, originalSpec.help, originalSpec.kind});
206 optionsRemap[procname][namespaced_name] = originalSpec.name;
207 // we need to back-apply
208 }
209 } else {
210 // we can stay with original option
211 for (auto& part : iter.second) {
212 combinedOptions.push_back(part.second);
213 }
214 }
215 }
216
217 // logic for combined task processing function --> target is to run one only
218 class CombinedTask
219 {
220 public:
221 CombinedTask(std::vector<DataProcessorSpec> const& s, std::unordered_map<std::string, std::unordered_map<std::string, std::string>> optionsRemap) : tasks(s), optionsRemap(optionsRemap) {}
222
223 void init(o2::framework::InitContext& ic)
224 {
225 std::cerr << "Init Combined\n";
226 mNThreads = std::max(1, ic.options().get<int>("combine-nthreads"));
227 if (mNThreads > 1) {
228#ifdef WITH_OPENMP
229 LOGP(info, "Combined tasks will be run with {} threads", mNThreads);
230 ROOT::EnableThreadSafety();
231#else
232 LOGP(warn, "{} threads requested for combined tasks but OpenMP is not detected, link it from the workflow CMakeList", mNThreads);
233 mNThreads = 1;
234#endif
235 }
236 for (auto& t : tasks) {
237 // the init function actually creates the onProcess function
238 // which we have to do here (maybe some more stuff needed)
239 auto& configRegistry = ic.mOptions;
240 // we can get hold of the store because the store is the only data member of configRegistry
241 static_assert(sizeof(o2::framework::ConfigParamRegistry) == sizeof(std::unique_ptr<o2::framework::ConfigParamStore>));
242 auto store = reinterpret_cast<std::unique_ptr<ConfigParamStore>*>(&(configRegistry));
243 auto& boost_store = (*store)->store(); // std::unique_ptr<boost::property_tree::ptree>
244 auto& originalDeviceName = t.name;
245 auto optionsiter = optionsRemap.find(originalDeviceName);
246 if (optionsiter != optionsRemap.end()) {
247 // we have options to remap
248 for (auto& key : optionsiter->second) {
249 //
250 // LOG(info) << "Applying value " << boost_store.get<std::string>(key.first) << " to original key " << key.second;
251 boost_store.put(key.second, boost_store.get<std::string>(key.first));
252 }
253 }
254 t.algorithm.onProcess = t.algorithm.onInit(ic);
255 }
256 }
257
259 {
260 std::cerr << "Processing Combined with " << mNThreads << " threads\n";
261 if (mNThreads > 1) {
262 size_t nt = tasks.size();
263#ifdef WITH_OPENMP
264#pragma omp parallel for schedule(dynamic) num_threads(mNThreads)
265#endif
266 for (size_t i = 0; i < nt; i++) {
267 auto& t = tasks[i];
268 std::cerr << " Executing sub-device " << t.name << "\n";
269 t.algorithm.onProcess(pc);
270 }
271 } else {
272 for (auto& t : tasks) {
273 std::cerr << " Executing sub-device " << t.name << "\n";
274 t.algorithm.onProcess(pc);
275 }
276 }
277 }
278
279 private:
280 std::vector<DataProcessorSpec> tasks;
281 std::unordered_map<std::string, std::unordered_map<std::string, std::string>> optionsRemap;
282 int mNThreads = 1;
283 };
284
285 combinedOptions.emplace_back(ConfigParamSpec{"combine-nthreads", VariantType::Int, 1, {"Number of threads for combined tasks"}});
286
287 return DataProcessorSpec{
288 name,
289 combinedInputSpec,
290 combinedOutputSpec,
291 AlgorithmSpec{adaptFromTask<CombinedTask>(mergableSpecs, optionsRemap)},
292 combinedOptions
293 /* a couple of other fields can be set ... */
294 };
295};
296
297} // namespace framework
298} // namespace o2
299
300#endif // O2_DPLWORKFLOWUTILS_H
int32_t i
bool o
StringRef key
char *const * argv() const
ConfigParamRegistry const & options()
Definition InitContext.h:33
ConfigParamRegistry & mOptions
Definition InitContext.h:36
GLuint const GLchar * name
Definition glcorearb.h:781
o2::framework::DataProcessorSpec specCombiner(std::string const &name, std::vector< DataProcessorSpec > const &speccollection, std::vector< DataProcessorSpec > &remaining)
bool isInternalDPL(std::string const &name)
bool isMasterWorkflowDefinition(ConfigContext const &configcontext)
bool isDumpWorkflowInvocation(ConfigContext const &configcontext)
std::string whoAmI(ConfigContext const &configcontext)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"