Project
Loading...
Searching...
No Matches
TopologyPolicyHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15namespace o2::framework
16{
17namespace
18{
19void describeDataProcessorSpec(std::ostream& stream, DataProcessorSpec const& spec)
20{
21 stream << spec.name;
22 if (!spec.labels.empty()) {
23 stream << "(";
24 bool first = false;
25 for (auto& label : spec.labels) {
26 stream << (first ? "" : ",") << label.value;
27 first = true;
28 }
29 stream << ")";
30 }
31}
32} // namespace
33
34auto TopologyPolicyHelpers::buildEdges(WorkflowSpec& physicalWorkflow) -> std::vector<std::pair<int, int>>
35{
36 std::vector<TopologyPolicy> topologyPolicies = TopologyPolicy::createDefaultPolicies();
37 std::vector<TopologyPolicy::DependencyChecker> dependencyCheckers;
38 dependencyCheckers.reserve(physicalWorkflow.size());
39
40 for (auto& spec : physicalWorkflow) {
41 for (auto& policy : topologyPolicies) {
42 if (policy.matcher(spec)) {
43 dependencyCheckers.push_back(policy.checkDependency);
44 break;
45 }
46 }
47 }
48 assert(dependencyCheckers.size() == physicalWorkflow.size());
49 // check if DataProcessorSpec at i depends on j
50 auto checkDependencies = [&workflow = physicalWorkflow,
51 &dependencyCheckers](int i, int j) {
52 TopologyPolicy::DependencyChecker& checker = dependencyCheckers[i];
53 return checker(workflow[i], workflow[j]);
54 };
55 std::vector<std::pair<int, int>> edges;
56 for (size_t i = 0; i < physicalWorkflow.size() - 1; ++i) {
57 for (size_t j = i; j < physicalWorkflow.size(); ++j) {
58 if (i == j && checkDependencies(i, j)) {
59 throw std::runtime_error(physicalWorkflow[i].name + " depends on itself");
60 }
61 bool both = false;
62 if (checkDependencies(i, j)) {
63 edges.emplace_back(j, i);
64 both = true;
65 }
66 if (checkDependencies(j, i)) {
67 edges.emplace_back(i, j);
68 if (both) {
69 std::ostringstream str;
70 describeDataProcessorSpec(str, physicalWorkflow[i]);
71 str << " has circular dependency with ";
72 describeDataProcessorSpec(str, physicalWorkflow[j]);
73 str << ":\n";
74 for (auto x : {i, j}) {
75 str << physicalWorkflow[x].name << ":\n";
76 str << "inputs:\n";
77 for (auto& input : physicalWorkflow[x].inputs) {
78 str << "- " << input << " " << (int)input.lifetime << "\n";
79 }
80 str << "outputs:\n";
81 for (auto& output : physicalWorkflow[x].outputs) {
82 str << "- " << output << " " << (int)output.lifetime << "\n";
83 }
84 }
85 throw std::runtime_error(str.str());
86 }
87 }
88 }
89 }
90 return edges;
91};
92} // namespace o2::framework
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint GLuint stream
Definition glcorearb.h:1806
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< DataProcessorLabel > labels
static auto buildEdges(WorkflowSpec &physicalWorkflow) -> std::vector< std::pair< int, int > >
static std::vector< TopologyPolicy > createDefaultPolicies()
std::function< bool(DataProcessorSpec const &dependent, DataProcessorSpec const &ascendant)> DependencyChecker
const std::string str