Project
Loading...
Searching...
No Matches
test_TimeParallelPipelining.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Mocking.h"
13#include <catch_amalgamated.hpp>
14#include "../src/DeviceSpecHelpers.h"
15#include "../src/SimpleResourceManager.h"
16#include "../src/ComputingResourceHelpers.h"
20
21using namespace o2::framework;
22
23// This is how you can define your processing in a declarative way
25{
26 auto result = WorkflowSpec{{
27 "A",
28 Inputs{},
29 {
30 OutputSpec{"TST", "A"},
31 },
32 },
34 {
35 "B",
36 Inputs{InputSpec{"a", "TST", "A"}},
37 Outputs{
38 OutputSpec{"TST", "B"},
39 },
40 },
41 2),
42 {
43 "C",
44 {InputSpec{"b", "TST", "B"}},
45 }};
46
47 return result;
48}
49
50TEST_CASE("TimePipeliningSimple")
51{
52 auto workflow = defineSimplePipelining();
53 std::vector<DeviceSpec> devices;
54 auto configContext = makeEmptyConfigContext();
55 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
56 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
57 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
58 std::vector<ComputingResource> resources = {ComputingResourceHelpers::getLocalhostResource()};
59 SimpleResourceManager rm(resources);
60 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
61 REQUIRE(devices.size() == 4);
62 auto& producer = devices[0];
63 auto& layer0Consumer0 = devices[1];
64 auto& layer0Consumer1 = devices[2];
65 auto& layer1Consumer0 = devices[3];
66 REQUIRE(producer.id == "A");
67 REQUIRE(layer0Consumer0.id == "B_t0");
68 REQUIRE(layer0Consumer1.id == "B_t1");
69 REQUIRE(layer1Consumer0.id == "C");
70}
71
72namespace
73{
74// This is how you can define your processing in a declarative way
76{
77 auto result = WorkflowSpec{
78 {
79 "A",
80 Inputs{},
81 {
82 OutputSpec{"TST", "A"},
83 },
84 },
86 {
87 "B",
88 Inputs{InputSpec{"a", "TST", "A"}},
89 Outputs{OutputSpec{"TST", "B1"}, OutputSpec{"TST", "B2"}},
90 },
91 2),
92 timePipeline({"C",
93 {InputSpec{"b", "TST", "B1"}},
94 {OutputSpec{"TST", "C"}}},
95 3),
97 {
98 "D",
99 {InputSpec{"c", "TST", "C"}, InputSpec{"d", "TST", "B2"}},
100 },
101 1)};
102
103 return result;
104}
105} // namespace
106
107TEST_CASE("TimePipeliningFull")
108{
109 auto workflow = defineDataProcessing();
110 std::vector<DeviceSpec> devices;
111 auto configContext = makeEmptyConfigContext();
112 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
113 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
114 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
115 std::vector<ComputingResource> resources = {ComputingResourceHelpers::getLocalhostResource()};
116 SimpleResourceManager rm(resources);
117 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
118 REQUIRE(devices.size() == 7);
119 auto& producer = devices[0];
120 auto& layer0Consumer0 = devices[1];
121 auto& layer0Consumer1 = devices[2];
122 auto& layer1Consumer0 = devices[3];
123 auto& layer1Consumer1 = devices[4];
124 auto& layer1Consumer2 = devices[5];
125 auto& layer2Consumer0 = devices[6];
126 REQUIRE(producer.id == "A");
127 REQUIRE(layer0Consumer0.id == "B_t0");
128 REQUIRE(layer0Consumer1.id == "B_t1");
129 REQUIRE(layer1Consumer0.id == "C_t0");
130 REQUIRE(layer1Consumer1.id == "C_t1");
131 REQUIRE(layer1Consumer2.id == "C_t2");
132 REQUIRE(layer2Consumer0.id == "D");
133}
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
std::unique_ptr< ConfigContext > makeEmptyConfigContext()
GLuint64EXT * result
Definition glcorearb.h:5662
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
TEST_CASE("test_prepareArguments")
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
static std::vector< CallbacksPolicy > createDefaultPolicies()
static std::vector< ChannelConfigurationPolicy > createDefaultPolicies(ConfigContext const &configContext)
Default policies to use, based on the contents of the @configContex content.
static std::vector< CompletionPolicy > createDefaultPolicies()
Helper to create the default configuration.
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
WorkflowSpec defineSimplePipelining()