28 results.reserve(maxIndex);
29 for (
size_t i = 0;
i < maxIndex; ++
i) {
30 results.push_back(original);
32 results.back().rank =
i;
33 results.back().nSlots = maxIndex;
34 amendCallback(results.back(),
i);
44 results.reserve(specs.size() * maxIndex);
45 for (
auto& spec : specs) {
47 results.insert(results.end(),
result.begin(),
result.end());
55 std::function<
size_t()> getNumberOfSubspecs,
56 std::function<
size_t(
size_t)> getSubSpec)
59 size_t numberOfSubspecs = getNumberOfSubspecs();
67 size_t inputMultiplicity = numberOfSubspecs /
nPipelines;
70 inputMultiplicity += 1;
73 auto inputs = std::move(spec.inputs);
74 auto outputs = std::move(spec.outputs);
75 spec.inputs.reserve(inputMultiplicity);
76 spec.outputs.reserve(inputMultiplicity);
77 for (
size_t inputNo = 0; inputNo < inputMultiplicity; ++inputNo) {
78 for (
auto& input : inputs) {
79 spec.inputs.push_back(input);
83 for (
auto&
output : outputs) {
84 spec.outputs.push_back(
output);
91 if (inputMultiplicity > numberOfSubspecs /
nPipelines &&
95 inputMultiplicity = numberOfSubspecs /
nPipelines;
102 result.insert(
result.end(), amendedProcessors.begin(), amendedProcessors.end());
114 std::function<
void(
InputSpec&,
size_t)> amendCallback)
117 results.reserve(maxIndex);
118 for (
size_t i = 0;
i < maxIndex; ++
i) {
119 results.push_back(original);
120 amendCallback(results.back(),
i);
127 std::function<
void(
InputSpec&,
size_t)> amendCallback)
130 results.reserve(inputs.size() * maxIndex);
131 for (
size_t i = 0;
i < maxIndex; ++
i) {
132 for (
auto const& original : inputs) {
133 results.push_back(original);
134 amendCallback(results.back(),
i);
144 std::runtime_error(
"You can time slice only once");
152std::vector<InputSpec>
select(
const char* matcher)
168 for (
auto& spec : specs) {
169 for (
auto& input : spec.inputs) {
170 combined.
inputs.push_back(input);
172 for (
auto&
output : spec.outputs) {
175 for (
auto& option : spec.options) {
176 combined.
options.push_back(option);
178 for (
auto&
label : spec.labels) {
181 for (
auto& metadatum : spec.metadata) {
182 combined.
metadata.push_back(metadatum);
184 for (
auto& service : spec.requiredServices) {
190 if (existing.name == service.name) {
202 std::vector<AlgorithmSpec::ProcessCallback> callbacks;
203 for (
auto& spec : specs) {
204 if (spec.algorithm.onInit) {
205 callbacks.push_back(spec.algorithm.onInit(ctx));
206 }
else if (spec.algorithm.onProcess) {
207 callbacks.push_back(spec.algorithm.onProcess);
211 for (
auto& callback : callbacks) {
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
WorkflowSpec combine(const char *name, std::vector< DataProcessorSpec > const &specs, bool doIt)
Defining PrimaryVertex explicitly as messageable.
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > select(char const *matcher="")
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
static std::vector< InputSpec > parse(const char *s="")
std::vector< DataProcessorMetadata > metadata
size_t maxInputTimeslices
std::vector< ServiceSpec > requiredServices
std::vector< DataProcessorLabel > labels
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
std::vector< ChannelData > channels