115 std::vector<DataProcessorSpec>& remaining)
117 std::vector<OutputSpec> combinedOutputSpec;
118 std::vector<InputSpec> combinedInputSpec;
119 std::vector<ConfigParamSpec> combinedOptions;
123 std::unordered_map<std::string, std::vector<std::pair<std::string, ConfigParamSpec>>> optionMerger;
126 std::unordered_map<std::string, std::unordered_map<std::string, std::string>> optionsRemap;
130 std::unordered_map<std::string, bool> inputBindings;
134 std::vector<OutputSpec> allOutputs;
135 std::vector<DataProcessorSpec> mergableSpecs;
136 for (
auto& spec : speccollection) {
138 for (
auto& os : spec.outputs) {
139 allOutputs.push_back(os);
143 for (
auto& spec : speccollection) {
144 auto& procname = spec.
name;
145 optionsRemap[procname] = std::unordered_map<std::string, std::string>();
149 bool inputCheckOk =
true;
150 for (
auto& is : spec.inputs) {
154 for (
auto&
o : allOutputs) {
156 std::cerr <<
"Found internal connection " << is <<
" ... will take out spec " << procname <<
" .. from merging process \n";
157 inputCheckOk =
false;
163 remaining.push_back(spec);
167 for (
auto& is : spec.inputs) {
168 if (inputBindings.find(is.binding) != inputBindings.end()) {
170 if (std::find(combinedInputSpec.begin(), combinedInputSpec.end(), is) == combinedInputSpec.end()) {
171 LOG(error) <<
"Found duplicate input binding with different spec.:" << is;
176 combinedInputSpec.push_back(is);
177 inputBindings[is.binding] = 1;
179 mergableSpecs.push_back(spec);
182 for (
auto& os : spec.outputs) {
183 combinedOutputSpec.push_back(os);
186 for (
auto& opt : spec.options) {
187 auto optkey = opt.name;
188 auto iter = optionMerger.find(optkey);
189 auto procconfigpair = std::pair<std::string, ConfigParamSpec>(procname, opt);
190 if (iter == optionMerger.end()) {
191 optionMerger[optkey] = std::vector<std::pair<std::string, ConfigParamSpec>>();
193 optionMerger[optkey].push_back(procconfigpair);
197 for (
auto& iter : optionMerger) {
198 if (iter.second.size() > 1) {
200 for (
auto& part : iter.second) {
201 auto procname = part.first;
202 auto originalSpec = part.second;
203 auto namespaced_name = procname +
"." + originalSpec.name;
205 originalSpec.
type, originalSpec.defaultValue, originalSpec.help, originalSpec.kind});
206 optionsRemap[procname][namespaced_name] = originalSpec.name;
211 for (
auto& part : iter.second) {
212 combinedOptions.push_back(part.second);
221 CombinedTask(std::vector<DataProcessorSpec>
const& s, std::unordered_map<std::string, std::unordered_map<std::string, std::string>> optionsRemap) : tasks(s), optionsRemap(optionsRemap) {}
225 std::cerr <<
"Init Combined\n";
226 mNThreads = std::max(1, ic.
options().
get<
int>(
"combine-nthreads"));
229 LOGP(info,
"Combined tasks will be run with {} threads", mNThreads);
230 ROOT::EnableThreadSafety();
232 LOGP(warn,
"{} threads requested for combined tasks but OpenMP is not detected, link it from the workflow CMakeList", mNThreads);
236 for (
auto& t : tasks) {
242 auto store =
reinterpret_cast<std::unique_ptr<ConfigParamStore>*
>(&(configRegistry));
243 auto& boost_store = (*store)->store();
244 auto& originalDeviceName = t.name;
245 auto optionsiter = optionsRemap.find(originalDeviceName);
246 if (optionsiter != optionsRemap.end()) {
248 for (
auto&
key : optionsiter->second) {
251 boost_store.put(
key.second, boost_store.get<std::string>(
key.first));
254 t.algorithm.onProcess = t.algorithm.onInit(ic);
260 std::cerr <<
"Processing Combined with " << mNThreads <<
" threads\n";
262 size_t nt = tasks.size();
264#pragma omp parallel for schedule(dynamic) num_threads(mNThreads)
266 for (
size_t i = 0;
i < nt;
i++) {
268 std::cerr <<
" Executing sub-device " << t.name <<
"\n";
269 t.algorithm.onProcess(pc);
272 for (
auto& t : tasks) {
273 std::cerr <<
" Executing sub-device " << t.name <<
"\n";
274 t.algorithm.onProcess(pc);
280 std::vector<DataProcessorSpec> tasks;
281 std::unordered_map<std::string, std::unordered_map<std::string, std::string>> optionsRemap;
291 AlgorithmSpec{adaptFromTask<CombinedTask>(mergableSpecs, optionsRemap)},
ConfigParamRegistry const & options()