Project
Loading...
Searching...
No Matches
WorkflowHelpers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_WORKFLOWHELPERS_H_
12#define O2_FRAMEWORK_WORKFLOWHELPERS_H_
13
14#include "Framework/InputSpec.h"
18
19#include <cstddef>
20#include <vector>
21#include <iosfwd>
22
23namespace o2::framework
24{
25
26inline static std::string debugWorkflow(std::vector<DataProcessorSpec> const& specs)
27{
28 std::ostringstream out;
29 for (auto& spec : specs) {
30 out << spec.name << "\n";
31 out << " Inputs:\n";
32 for (auto& ii : spec.inputs) {
33 out << " - " << DataSpecUtils::describe(ii) << "\n";
34 }
35 // out << "\n Outputs:\n";
36 // for (auto& ii : spec.outputs) {
37 // out << " - " << DataSpecUtils::describe(ii) << "\n";
38 // }
39 }
40 return out.str();
41}
42
43struct ConfigContext;
44// Structure to hold information which was derived
45// for output channels.
47 size_t specIndex;
49 bool forward;
50 bool enabled = true;
51};
52
53// We use this to keep track of the forwards which should
54// be added to each device.
55// @a consumer is the data processor id the information refers to
56// (so all the devices which are incarnation of that data processor should
57// have the forward).
58// @a inputGlobalIndex is pointer to a unique id for the input the forward
59// refers to.
65
66enum struct ConnectionKind {
67 Out = 0,
68 Forward = 1,
69 In = 2,
70 Unknown = 3
71};
72
74 // the index in the workflow of the producer DataProcessorSpec
75 size_t producer;
76 // the index in the workflow of the consumer DataProcessorSpec
77 size_t consumer;
78 // The timeindex for the consumer
79 size_t timeIndex;
80 // The timeindex of the producer
82 // An absolute id for the output
84 // A DataProcessor relative id for the input
86 // Whether this is the result of a forwarding operation or not
89};
90
91// Unique identifier for a connection
93 size_t producer;
94 size_t consumer;
95 size_t timeIndex;
97 uint16_t port;
98
99 bool operator<(const DeviceConnectionId& rhs) const
100 {
101 return std::tie(producer, consumer, timeIndex, producerTimeIndex) <
102 std::tie(rhs.producer, rhs.consumer, rhs.timeIndex, rhs.producerTimeIndex);
103 }
104};
105
106// A device is uniquely identified by its DataProcessorSpec and
107// the timeslice it consumes.
108struct DeviceId {
110 size_t timeslice;
112
113 bool operator<(const DeviceId& rhs) const
114 {
115 return std::tie(processorIndex, timeslice) <
116 std::tie(rhs.processorIndex, rhs.timeslice);
117 }
118};
119
121 bool requiresNewDevice = false;
122 bool requiresNewChannel = false;
123};
124
127 int index;
128 int layer;
129 bool operator<(TopoIndexInfo const& rhs) const
130 {
131 return index < rhs.index;
132 }
133 bool operator==(TopoIndexInfo const& rhs) const
134 {
135 return index == rhs.index;
136 }
137
138 friend std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info);
139};
140
141// Information about the policies which were derived for a given data processor.
145
146struct OutputObj {
149};
150
151enum struct WorkflowParsingState : int {
152 Valid,
153 Empty,
154};
155
168 static std::vector<TopoIndexInfo> topologicalSort(size_t nodeCount,
169 int const* edgeIn,
170 int const* edgeOut,
171 size_t byteStride,
172 size_t edgesCount);
173
174 // Helper method to verify that a given workflow is actually valid e.g. that
175 // it contains no empty labels.
176 [[nodiscard]] static WorkflowParsingState verifyWorkflow(const WorkflowSpec& workflow);
177
178 // Depending on the workflow and the dangling inputs inside it, inject "fake"
179 // devices to mark the fact we might need some extra action to make sure
180 // dangling inputs are satisfied.
181 // @a workflow the workflow to decorate
182 // @a ctx the context for the configuration phase
183 static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx);
184
185 // Final adjustments to @a workflow after service devices have been injected.
186 static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx);
187
188 static void constructGraph(const WorkflowSpec& workflow,
189 std::vector<DeviceConnectionEdge>& logicalEdges,
190 std::vector<OutputSpec>& outputs,
191 std::vector<LogicalForwardInfo>& availableForwardsInfo);
192
193 // FIXME: this is an implementation detail for compute edge action,
194 // actually. It should be moved to the cxx. Comes handy for testing things though..
195 static void sortEdges(std::vector<size_t>& inEdgeIndex,
196 std::vector<size_t>& outEdgeIndex,
197 const std::vector<DeviceConnectionEdge>& edges);
198
199 static std::vector<EdgeAction> computeOutEdgeActions(
200 const std::vector<DeviceConnectionEdge>& edges,
201 const std::vector<size_t>& index);
202
203 static std::vector<EdgeAction> computeInEdgeActions(
204 const std::vector<DeviceConnectionEdge>& edges,
205 const std::vector<size_t>& index);
206
212 static std::tuple<std::vector<InputSpec>, std::vector<bool>> analyzeOutputs(WorkflowSpec const& workflow);
213
215 static std::vector<InputSpec> computeDanglingOutputs(WorkflowSpec const& workflow);
216
221 static void validateEdges(WorkflowSpec const& workflow,
222 std::vector<DataProcessorPoliciesInfo> const& policiesInfos,
223 std::vector<DeviceConnectionEdge> const& edges,
224 std::vector<OutputSpec> const& outputs);
225};
226
227} // namespace o2::framework
228
229#endif // O2_FRAMEWORK_WORKFLOWHELPERS_H_
GLuint index
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
static std::string describe(InputSpec const &spec)
bool operator<(const DeviceConnectionId &rhs) const
bool operator<(const DeviceId &rhs) const
Helper struct to keep track of the results of the topological sort.
bool operator<(TopoIndexInfo const &rhs) const
int layer
the associated layer in the sorting procedure
int index
the index in the actual storage of the nodes to be sorted topologically
bool operator==(TopoIndexInfo const &rhs) const
friend std::ostream & operator<<(std::ostream &out, TopoIndexInfo const &info)
A set of internal helper classes to manipulate a Workflow.
static void validateEdges(WorkflowSpec const &workflow, std::vector< DataProcessorPoliciesInfo > const &policiesInfos, std::vector< DeviceConnectionEdge > const &edges, std::vector< OutputSpec > const &outputs)
static void adjustTopology(WorkflowSpec &workflow, ConfigContext const &ctx)
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)