Project
Loading...
Searching...
No Matches
vectorTopologyCommon.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef ALICEO2_VECTORTOPOLOGYCOMMON_H_
13#define ALICEO2_VECTORTOPOLOGYCOMMON_H_
14
15#include <array>
16#include <chrono>
17#include <cstddef>
18#include <cstdlib>
19#include <fairlogger/Logger.h>
20#include <thread>
21
22#include <TH1F.h>
23
31#include <Mergers/ObjectStore.h>
32
33#include "common.h"
34
35void customize(std::vector<o2::framework::CompletionPolicy>& policies)
36{
38}
39
40// keep this include here
42
43namespace o2::framework
44{
46
47template <size_t Size>
48using ExpectedType = std::array<float, Size>;
49
50template <typename Deleter, size_t Size>
51bool operator==(const std::unique_ptr<const std::vector<TObject*>, Deleter>& vectorOfHistos, std::vector<ExpectedType<Size>>& expected)
52{
53 return std::equal(vectorOfHistos->begin(), vectorOfHistos->end(), expected.begin(), expected.end(),
54 [](TObject* const object, const ExpectedType<Size>& array) {
55 const auto& histo = *dynamic_cast<TH1F const*>(object);
56 return std::equal(array.begin(), array.end(), histo.GetArray(), histo.GetArray() + histo.GetSize());
57 });
58}
59
60template <size_t HistoSize>
62{
63 static constexpr const char origin[] = {"TST"};
64 static constexpr const char description[] = {"VEC"};
65
66 public:
67 VectorMergerTestGenerator(std::vector<std::array<float, HistoSize>>&& expectedResult, size_t histoBinsCount, double histoMin, double histoMax)
68 : mExpectedResult{expectedResult}, mHistoBinsCount{histoBinsCount}, mHistoMin{histoMin}, mHistoMax{histoMax}
69 {
70 }
71
72 Inputs generateHistoProducers(WorkflowSpec& specs, size_t numberOfProducers)
73 {
74 Inputs mergersInputs;
75
76 for (size_t producerIdx = 1; producerIdx < numberOfProducers + 1; ++producerIdx) {
77 mergersInputs.push_back({"mo", origin, description, static_cast<SubSpecificationType>(producerIdx), Lifetime::Sporadic});
78 DataProcessorSpec producer{
79 "producer-vec" + std::to_string(producerIdx),
80 Inputs{},
81 Outputs{{{"mo"}, origin, description, static_cast<SubSpecificationType>(producerIdx), Lifetime::Sporadic}},
82 AlgorithmSpec{static_cast<AlgorithmSpec::ProcessCallback>([producerIdx, numberOfProducers, binsCount = mHistoBinsCount, histoMin = mHistoMin, histoMax = mHistoMax](ProcessingContext& processingContext) mutable {
83 const auto subspec = static_cast<SubSpecificationType>(producerIdx);
84 auto vectorOfHistos = std::make_unique<mergers::VectorOfRawTObjects>(2);
85
86 int i = 0;
87 for (auto& hist_ptr : *vectorOfHistos) {
88 const auto histoname = std::string{"histo"} + std::to_string(++i);
89 auto* hist = new TH1F(histoname.c_str(), histoname.c_str(), binsCount, histoMin, histoMax);
90 hist->Fill(producerIdx);
91 hist->Fill(5);
92 hist_ptr = hist;
93 }
94
95 processingContext.outputs().snapshot(OutputRef{"mo", subspec}, *vectorOfHistos);
96 for_each(vectorOfHistos->begin(), vectorOfHistos->end(), [](auto& histoPtr) { delete histoPtr; });
97 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
98 })}};
99 specs.push_back(producer);
100 }
101 return mergersInputs;
102 }
103
104 void generateMergers(WorkflowSpec& specs, const Inputs& mergersInputs, mergers::InputObjectsTimespan mergerType)
105 {
106 using namespace mergers;
107
108 MergerInfrastructureBuilder mergersBuilder;
109 mergersBuilder.setInfrastructureName("vec");
110 mergersBuilder.setInputSpecs(mergersInputs);
111 mergersBuilder.setOutputSpec({{"main"}, origin, description, 0});
112 MergerConfig config;
113 config.inputObjectTimespan = {mergerType};
114 std::vector<std::pair<size_t, size_t>> param = {{5, 1}};
115 config.publicationDecision = {PublicationDecision::EachNSeconds, param};
116 config.mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
117 config.topologySize = {TopologySize::NumberOfLayers, 2};
118 mergersBuilder.setConfig(config);
119
120 mergersBuilder.generateInfrastructure(specs);
121 }
122
124 {
125 specs.push_back(DataProcessorSpec{
126 "data-checker",
127 Inputs{{"vec", origin, description, 0, Lifetime::Sporadic}},
128 Outputs{},
130 AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
131 auto success = std::make_shared<bool>(false);
132 mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
133
134 // reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers.
135 // number of retries was chosen a bit randomly, as we need to have at least 2 runs through this function because of publish
136 // timers inside of the mergers
137 return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable {
138 const auto vectorOfHistos = processingContext.inputs().get<std::vector<TObject*>*>("vec");
139
140 LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(vectorOfHistos) << " to the expected: " << std::to_string(expectedResult);
141 if (vectorOfHistos == expectedResult) {
142 LOG(info) << "Received the expected object, test successful";
143 *success = true;
144 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
145 return;
146 }
147
148 if (retryNumber++ > retries) {
149 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
150 LOG(fatal) << "received wrong data: " << std::to_string(vectorOfHistos) << ", expected: " << std::to_string(expectedResult);
151 return;
152 }
153 }};
154 }}}});
155 }
156
157 private:
158 std::vector<ExpectedType<HistoSize>> mExpectedResult;
159 size_t mHistoBinsCount;
160 double mHistoMin;
161 double mHistoMax;
162};
163
164} // namespace o2::framework
165
166#endif
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
int32_t i
Definition of O2 MergerInfrastructureBuilder, v0.1.
Definition of ObjectStore for Mergers, v0.1.
void generateMergers(WorkflowSpec &specs, const Inputs &mergersInputs, mergers::InputObjectsTimespan mergerType)
Inputs generateHistoProducers(WorkflowSpec &specs, size_t numberOfProducers)
VectorMergerTestGenerator(std::vector< std::array< float, HistoSize > > &&expectedResult, size_t histoBinsCount, double histoMin, double histoMax)
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
GLenum array
Definition glcorearb.h:4274
GLenum GLfloat param
Definition glcorearb.h:271
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
bool operator==(const std::unique_ptr< const std::vector< TObject * >, Deleter > &vectorOfHistos, std::vector< ExpectedType< Size > > &expected)
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
@ All
Quit all data processor, regardless of their state.
header::DataHeader::SubSpecificationType SubSpecificationType
std::array< float, Size > ExpectedType
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void registerCallbacksForTestFailure(framework::CallbackService &cb, std::shared_ptr< bool > success)
Definition common.h:27
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
uint32_t SubSpecificationType
Definition DataHeader.h:620
std::map< std::string, ID > expected
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
void customize(std::vector< o2::framework::CompletionPolicy > &policies)