Project
Loading...
Searching...
No Matches
customTopologyCommon.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef ALICEO2_CUSTOMSTOPOLOGYCOMMON_H_
13#define ALICEO2_CUSTOMSTOPOLOGYCOMMON_H_
14
15#include <chrono>
16#include <cstddef>
17#include <cstdlib>
18#include <fairlogger/Logger.h>
19#include <thread>
20
27#include "common.h"
28
29void customize(std::vector<o2::framework::CompletionPolicy>& policies)
30{
32 policies.emplace_back(o2::framework::CompletionPolicyHelpers::consumeWhenAny("data-checker"));
33}
34
35// keep this include here
37
38namespace o2::framework
39{
41
43{
44 static constexpr const char origin[] = {"TST"};
45 static constexpr const char description[] = {"CUSTOM"};
46 static constexpr const char description_moving_window[] = {"CUSTOM_MW"};
47
48 public:
49 CustomMergerTestGenerator(size_t expectedResult)
50 : mExpectedResult{expectedResult}
51 {
52 }
53
54 Inputs generateHistoProducers(WorkflowSpec& specs, size_t numberOfProducers)
55 {
56 Inputs inputs{};
57
58 for (size_t p = 0; p < numberOfProducers; p++) {
59 inputs.push_back({"mo", origin, description, static_cast<SubSpecificationType>(p + 1), Lifetime::Sporadic});
60
61 DataProcessorSpec producer{
62 "producer-custom" + std::to_string(p),
63 Inputs{},
64 Outputs{{{"mo"}, origin, description, static_cast<SubSpecificationType>(p + 1), Lifetime::Sporadic}},
65 AlgorithmSpec{static_cast<AlgorithmSpec::ProcessCallback>([p, numberOfProducers](ProcessingContext& processingContext) mutable {
66 auto customObject = std::make_unique<mergers::CustomMergeableObject>(1);
67 auto subspec = static_cast<SubSpecificationType>(p + 1);
68 processingContext.outputs().snapshot(OutputRef{"mo", subspec}, *customObject);
69 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
70 })}};
71 specs.push_back(producer);
72 }
73
74 return inputs;
75 }
76
77 void generateMergers(WorkflowSpec& specs, const Inputs& mergerInputs, mergers::InputObjectsTimespan mergerType)
78 {
79 using namespace mergers;
80
81 MergerInfrastructureBuilder mergersBuilder;
82 mergersBuilder.setInfrastructureName("custom");
83 mergersBuilder.setInputSpecs(mergerInputs);
84 mergersBuilder.setOutputSpec({{"main"}, origin, description, 0});
85
86 MergerConfig config;
87 config.inputObjectTimespan = {mergerType};
88 std::vector<std::pair<size_t, size_t>> param = {{5, 1}};
89 config.publicationDecision = {PublicationDecision::EachNSeconds, param};
90 config.mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
91 config.topologySize = {TopologySize::NumberOfLayers, 2};
92
94 mergersBuilder.setOutputSpecMovingWindow({{"main"}, origin, description_moving_window, 0});
95 config.publishMovingWindow = {PublishMovingWindow::Yes};
96 }
97
98 mergersBuilder.setConfig(config);
99
100 mergersBuilder.generateInfrastructure(specs);
101 }
102
104 {
105 specs.push_back(DataProcessorSpec{
106 "data-checker",
107 Inputs{
108 {"custom", origin, description, 0, Lifetime::Sporadic},
109 {"custom_mw", origin, description_moving_window, 0, Lifetime::Sporadic},
110 },
111 Outputs{},
113 AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
114 auto success = std::make_shared<bool>(false);
115 mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
116
118 [expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5, success](ProcessingContext& processingContext) mutable {
119 numberOfCalls++;
120
121 if (processingContext.inputs().isValid("custom")) {
122 auto obj = processingContext.inputs().get<mergers::CustomMergeableObject*>("custom");
123 numberOfObjects++;
124 // we are keeping only the last value of secret, as there can be inconsitencies caused
125 // by the lack of synchronisation between merger layers
126 lastObjectValue = obj->getSecret();
127 }
128
129 if (processingContext.inputs().isValid("custom_mw")) {
130 auto mw = processingContext.inputs().get<mergers::CustomMergeableObject*>("custom_mw");
131 numberOfMovingWindows++;
132 // it is not possible to check for correct value in moving window as the value can be lost due
133 // to the sync between layers of mergers and movement of the window
134 }
135
136 if (numberOfCalls == retries) {
137 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
138
139 // we should get new object on each publish timeout of the mergers,
140 // lower and upper boundaries of moving windows are chosen arbitrarily
141 if (numberOfObjects != retries || numberOfMovingWindows == 0 || numberOfMovingWindows > 10) {
142 LOG(fatal) << "expected 5 objects and got: " << numberOfObjects << ", expected 1-10 moving windows and got: " << numberOfMovingWindows;
143 if (lastObjectValue != expectedResult) {
144 LOG(fatal) << "got wrong secret from object: " << lastObjectValue << ", expected: " << expectedResult;
145 }
146 return;
147 }
148 LOG(info) << "Received the expected objects, test successful";
149 *success = true;
150 }
151 }};
152 }}}});
153 }
154
156 {
157 specs.push_back(DataProcessorSpec{
158 "data-checker",
159 Inputs{
160 {"custom", origin, description, 0, Lifetime::Sporadic},
161 },
162 Outputs{},
164 AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
165 auto success = std::make_shared<bool>(false);
166 mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
167
169 [expectedResult, retryNumber = 0, numberOfRetries = 5, success](ProcessingContext& processingContext) mutable {
170 const auto obj = processingContext.inputs().get<mergers::CustomMergeableObject*>("custom");
171
172 if (obj->getSecret() == expectedResult) {
173 LOG(info) << "Received the expected object, test successful";
174 *success = true;
175 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
176 return;
177 }
178
179 if (retryNumber++ == numberOfRetries) {
180 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
181 LOG(fatal) << "Unsuccessfully tried " << retryNumber << " times to get a expected result: " << expectedResult;
182 }
183 }};
184 }}}});
185 }
186
187 private:
188 size_t mExpectedResult;
189};
190
191} // namespace o2::framework
192
193#endif
An example of overriding O2 Mergers merging interface, v0.1.
Definition of O2 MergerInfrastructureBuilder, v0.1.
void generateMergers(WorkflowSpec &specs, const Inputs &mergerInputs, mergers::InputObjectsTimespan mergerType)
void generateCheckerIntegrating(WorkflowSpec &specs)
void generateCheckerFullHistory(WorkflowSpec &specs)
Inputs generateHistoProducers(WorkflowSpec &specs, size_t numberOfProducers)
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
GLenum GLfloat param
Definition glcorearb.h:271
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
@ All
Quit all data processor, regardless of their state.
header::DataHeader::SubSpecificationType SubSpecificationType
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void registerCallbacksForTestFailure(framework::CallbackService &cb, std::shared_ptr< bool > success)
Definition common.h:27
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"