Project
Loading...
Searching...
No Matches
histosTopologyCommon.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef ALICEO2_HISTOSTOPOLOGYCOMMON_H_
13#define ALICEO2_HISTOSTOPOLOGYCOMMON_H_
14
15#include <array>
16#include <chrono>
17#include <cstddef>
18#include <cstdlib>
19#include <fairlogger/Logger.h>
20#include <thread>
21
22#include <TH1F.h>
23
31
32#include "common.h"
33
34void customize(std::vector<o2::framework::CompletionPolicy>& policies)
35{
37}
38
39// keep this include here
41
42namespace o2::framework
43{
45
46template <size_t HistoSize>
48{
49 static constexpr const char origin[] = {"TST"};
50 static constexpr const char description[] = {"HISTO"};
51
52 public:
53 HistosMergerTestGenerator(std::array<float, HistoSize>&& expectedResult, size_t histoBinsCount, double histoMin, double histoMax)
54 : mExpectedResult{expectedResult}, mHistoBinsCount{histoBinsCount}, mHistoMin{histoMin}, mHistoMax{histoMax}
55 {
56 }
57
58 Inputs generateHistoProducers(WorkflowSpec& specs, size_t numberOfProducers)
59 {
60 Inputs inputs{};
61 for (size_t producerIdx = 1; producerIdx != numberOfProducers + 1; ++producerIdx) {
62 inputs.push_back({"mo", origin, description, static_cast<SubSpecificationType>(producerIdx), Lifetime::Sporadic});
63 specs.push_back(DataProcessorSpec{
64 std::string{"producer-histo"} + std::to_string(producerIdx),
65 Inputs{},
66 Outputs{{{"mo"}, origin, description, static_cast<SubSpecificationType>(producerIdx), Lifetime::Sporadic}},
68 static_cast<AlgorithmSpec::ProcessCallback>([histoBinsCount = mHistoBinsCount, histoMin = mHistoMin, histoMax = mHistoMax, producerIdx](ProcessingContext& processingContext) mutable {
69 TH1F& histo = processingContext.outputs().make<TH1F>(
70 Output{origin, description, static_cast<SubSpecificationType>(producerIdx)},
71 "histo", "histo", histoBinsCount, histoMin, histoMax);
72 histo.Fill(5);
73 histo.Fill(producerIdx);
74 processingContext.services().get<ControlService>().endOfStream();
75 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
76 })}});
77 }
78 return inputs;
79 }
80
81 void generateMergers(WorkflowSpec& specs, const Inputs& producerInputs, mergers::InputObjectsTimespan mergerType)
82 {
83 using namespace mergers;
84
85 MergerInfrastructureBuilder mergersBuilder;
86 mergersBuilder.setInfrastructureName("histos");
87 mergersBuilder.setInputSpecs(producerInputs);
88 mergersBuilder.setOutputSpec({{"main"}, origin, description, 0});
89 MergerConfig config;
90 config.inputObjectTimespan = {mergerType};
91 std::vector<std::pair<size_t, size_t>> param = {{5, 1}};
92 config.publicationDecision = {PublicationDecision::EachNSeconds, param};
93 config.mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
94 config.topologySize = {TopologySize::NumberOfLayers, 2};
95 mergersBuilder.setConfig(config);
96
97 mergersBuilder.generateInfrastructure(specs);
98 }
99
101 {
102 specs.push_back(DataProcessorSpec{
103 "data-checker",
104 Inputs{{"histo", origin, description, 0, Lifetime::Sporadic}},
105 Outputs{},
107 AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
108 auto success = std::make_shared<bool>(false);
109 mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);
110
111 // reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers,
112 // number of retries is chosen arbitrarily as we need to retry at least twice
113 return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable {
114 const auto histo = processingContext.inputs().get<TH1F*>("histo");
115
116 LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(histo) << " to the expected: " << std::to_string(expectedResult);
117 if (std::equal(expectedResult.begin(), expectedResult.end(), histo->GetArray(), histo->GetArray() + histo->GetSize())) {
118 LOG(info) << "Received the expected object, test successful";
119 *success = true;
120 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
121 return;
122 }
123
124 if (retryNumber++ >= retries) {
125 processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
126 LOG(fatal) << "received incorrect data: " << std::to_string(histo) << ", expected: " << std::to_string(gsl::span(expectedResult));
127 }
128 }};
129 }}}});
130 }
131
132 private:
133 std::array<float, HistoSize> mExpectedResult;
134 size_t mHistoBinsCount;
135 double mHistoMin;
136 double mHistoMax;
137};
138
139} // namespace o2::framework
140
141#endif
o2::framework::DataAllocator::SubSpecificationType SubSpecificationType
Definition of O2 MergerInfrastructureBuilder, v0.1.
Inputs generateHistoProducers(WorkflowSpec &specs, size_t numberOfProducers)
void generateMergers(WorkflowSpec &specs, const Inputs &producerInputs, mergers::InputObjectsTimespan mergerType)
HistosMergerTestGenerator(std::array< float, HistoSize > &&expectedResult, size_t histoBinsCount, double histoMin, double histoMax)
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
GLenum GLfloat param
Definition glcorearb.h:271
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
@ Me
Only quit this data processor.
@ All
Quit all data processor, regardless of their state.
header::DataHeader::SubSpecificationType SubSpecificationType
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
void registerCallbacksForTestFailure(framework::CallbackService &cb, std::shared_ptr< bool > success)
Definition common.h:27
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"