12#ifndef ALICEO2_CUSTOMSTOPOLOGYCOMMON_H_
13#define ALICEO2_CUSTOMSTOPOLOGYCOMMON_H_
18#include <fairlogger/Logger.h>
29void customize(std::vector<o2::framework::CompletionPolicy>& policies)
44 static constexpr const char origin[] = {
"TST"};
45 static constexpr const char description[] = {
"CUSTOM"};
46 static constexpr const char description_moving_window[] = {
"CUSTOM_MW"};
50 : mExpectedResult{expectedResult}
58 for (
size_t p = 0; p < numberOfProducers; p++) {
59 inputs.push_back({
"mo", origin, description,
static_cast<SubSpecificationType>(p + 1), Lifetime::Sporadic});
66 auto customObject = std::make_unique<mergers::CustomMergeableObject>(1);
68 processingContext.outputs().snapshot(
OutputRef{
"mo", subspec}, *customObject);
71 specs.push_back(producer);
79 using namespace mergers;
81 MergerInfrastructureBuilder mergersBuilder;
82 mergersBuilder.setInfrastructureName(
"custom");
83 mergersBuilder.setInputSpecs(mergerInputs);
84 mergersBuilder.setOutputSpec({{
"main"}, origin, description, 0});
87 config.inputObjectTimespan = {mergerType};
88 std::vector<std::pair<size_t, size_t>>
param = {{5, 1}};
89 config.publicationDecision = {PublicationDecision::EachNSeconds,
param};
90 config.mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
91 config.topologySize = {TopologySize::NumberOfLayers, 2};
94 mergersBuilder.setOutputSpecMovingWindow({{
"main"}, origin, description_moving_window, 0});
95 config.publishMovingWindow = {PublishMovingWindow::Yes};
98 mergersBuilder.setConfig(config);
100 mergersBuilder.generateInfrastructure(specs);
108 {
"custom", origin, description, 0, Lifetime::Sporadic},
109 {
"custom_mw", origin, description_moving_window, 0, Lifetime::Sporadic},
114 auto success = std::make_shared<bool>(
false);
118 [expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5, success](
ProcessingContext& processingContext)
mutable {
121 if (processingContext.inputs().isValid(
"custom")) {
129 if (processingContext.inputs().isValid(
"custom_mw")) {
131 numberOfMovingWindows++;
136 if (numberOfCalls == retries) {
141 if (numberOfObjects != retries || numberOfMovingWindows == 0 || numberOfMovingWindows > 10) {
142 LOG(fatal) <<
"expected 5 objects and got: " << numberOfObjects <<
", expected 1-10 moving windows and got: " << numberOfMovingWindows;
143 if (lastObjectValue != expectedResult) {
144 LOG(fatal) <<
"got wrong secret from object: " << lastObjectValue <<
", expected: " << expectedResult;
148 LOG(info) <<
"Received the expected objects, test successful";
160 {
"custom", origin, description, 0, Lifetime::Sporadic},
165 auto success = std::make_shared<bool>(
false);
169 [expectedResult, retryNumber = 0, numberOfRetries = 5, success](
ProcessingContext& processingContext)
mutable {
172 if (obj->getSecret() == expectedResult) {
173 LOG(info) <<
"Received the expected object, test successful";
179 if (retryNumber++ == numberOfRetries) {
181 LOG(fatal) <<
"Unsuccessfully tried " << retryNumber <<
" times to get a expected result: " << expectedResult;
188 size_t mExpectedResult;
An example of overriding O2 Mergers merging interface, v0.1.
Definition of O2 MergerInfrastructureBuilder, v0.1.
void generateMergers(WorkflowSpec &specs, const Inputs &mergerInputs, mergers::InputObjectsTimespan mergerType)
void generateCheckerIntegrating(WorkflowSpec &specs)
void generateCheckerFullHistory(WorkflowSpec &specs)
Inputs generateHistoProducers(WorkflowSpec &specs, size_t numberOfProducers)
CustomMergerTestGenerator(size_t expectedResult)
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
@ All
Quit all data processor, regardless of their state.
header::DataHeader::SubSpecificationType SubSpecificationType
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void registerCallbacksForTestFailure(framework::CallbackService &cb, std::shared_ptr< bool > success)
std::string to_string(gsl::span< T, Size > span)
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"