Project
Loading...
Searching...
No Matches
test_VariablePayloadSequenceWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
24#include "Framework/Logger.h"
25#include "Headers/DataHeader.h"
26#include "Headers/Stack.h"
28#include <fairmq/Device.h>
29#include <memory>
30#include <random>
31
32using namespace o2::framework;
35
36// we need to specify customizations before including Framework/runDataProcessing
37// customize consumer to process immediately what comes in
38void customize(std::vector<o2::framework::CompletionPolicy>& policies)
39{
40 // we customize the pipeline processors to consume data as it comes
43 policies.push_back(CompletionPolicyHelpers::defineByName("consumer", CompletionPolicy::CompletionOp::Consume));
44 policies.push_back(CompletionPolicyHelpers::defineByName("spectator", CompletionPolicy::CompletionOp::Consume));
45}
46
48
49#define ASSERT_ERROR(condition) \
50 if ((condition) == false) { \
51 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
52 }
53
54namespace test
55{
56// a header with the information expected in the payload
57// will be sent on the header stack
59 // static data for this header type/version
60 static constexpr uint32_t sVersion{1};
61 static constexpr o2::header::HeaderType sHeaderType{o2::header::String2<uint64_t>("SequDesc")};
63
64 size_t iteration = 0;
65 size_t nPayloads = 0;
66 size_t initialValue = 0;
67
68 constexpr SequenceDesc(size_t i, size_t n, size_t v)
70 {
71 }
72};
73
74} // namespace test
75
76std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
77{
78 struct Attributes {
79 using EngineT = std::mt19937;
80 using DistributionT = std::uniform_int_distribution<>;
81 size_t nRolls = 2;
82 EngineT gen;
83 DistributionT distrib;
84 size_t iteration = 0;
85 std::string channelName;
86 };
87
88 std::random_device rd;
89 auto attributes = std::make_shared<Attributes>();
90 attributes->nRolls = 4;
91 attributes->gen = std::mt19937(rd());
92 attributes->distrib = std::uniform_int_distribution<>{1, 20};
93
94 std::vector<DataProcessorSpec> workflow;
96 // a producer process steered by a timer
97 //
98 // the compute callback of the producer
99 // Producing three types of output:
100 // 1. via default DPL Allocator
101 // 2. multiple payloads in split-payloads format (header-payload pairs)
102 // 3. multiple payload sequence with one header
103 auto producerCallback = [attributes](InputRecord& inputs, DataAllocator& outputs, ControlService& control, RawDeviceService& rds) {
104 auto& counter = attributes->iteration;
105 auto& channelName = attributes->channelName;
106 auto& nRolls = attributes->nRolls;
107 outputs.make<int>(OutputRef{"allocator", 0}) = counter;
108
109 if (channelName.empty()) {
110 OutputSpec const query{"TST", "SEQUENCE", 0};
111 auto outputRoutes = rds.spec().outputs;
112 for (auto& route : outputRoutes) {
113 if (DataSpecUtils::match(route.matcher, query)) {
114 channelName = route.channel;
115 break;
116 }
117 }
118 ASSERT_ERROR(channelName.length() > 0);
119 }
120 fair::mq::Device& device = *(rds.device());
121 auto transport = device.GetChannel(channelName, 0).Transport();
122 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
123
124 auto const* dph = DataRefUtils::getHeader<DataProcessingHeader*>(inputs.get("timer"));
125 test::SequenceDesc sd{counter, 0, 0};
126
127 fair::mq::Parts messages;
128 auto createSequence = [&dph, &sd, &attributes, &transport, &channelAlloc, &messages](size_t nPayloads, DataHeader dh) -> void {
129 // one header with index set to the number of split parts indicates sequence
130 // of payloads without additional headers
131 dh.payloadSize = sizeof(size_t);
132 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
133 dh.splitPayloadIndex = nPayloads;
134 dh.splitPayloadParts = nPayloads;
135 sd.nPayloads = nPayloads;
136 sd.initialValue = attributes->distrib(attributes->gen);
137 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, *dph, sd});
138 messages.AddPart(std::move(header));
139
140 for (size_t i = 0; i < nPayloads; ++i) {
141 fair::mq::MessagePtr payload = transport->CreateMessage(dh.payloadSize);
142 *(reinterpret_cast<size_t*>(payload->GetData())) = sd.initialValue + i;
143 messages.AddPart(std::move(payload));
144 }
145 };
146
147 auto createPairs = [&dph, &transport, &channelAlloc, &messages](size_t nPayloads, DataHeader dh) -> void {
148 // one header with index set to the number of split parts indicates sequence
149 // of payloads without additional headers
150 dh.payloadSize = sizeof(size_t);
151 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
152 dh.splitPayloadIndex = 0;
153 dh.splitPayloadParts = nPayloads;
154 for (size_t i = 0; i < nPayloads; ++i) {
155 dh.splitPayloadIndex = i;
156 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, *dph});
157 messages.AddPart(std::move(header));
158 fair::mq::MessagePtr payload = transport->CreateMessage(dh.payloadSize);
159 *(reinterpret_cast<size_t*>(payload->GetData())) = i;
160 messages.AddPart(std::move(payload));
161 }
162 };
163
164 createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0});
165 createPairs(counter + 1, DataHeader{"PAIR", "TST", 0});
166
167 // using utility from ExternalFairMQDeviceProxy
168 sendOnChannel(device, messages, channelName, (size_t)-1);
169
170 if (++(counter) >= nRolls) {
171 // send the end of stream signal, this is transferred by the proxies
172 // and allows to properly terminate downstream devices
173 control.endOfStream();
174 control.readyToQuit(QuitRequest::Me);
175 }
176 };
177
178 workflow.emplace_back(DataProcessorSpec{"producer",
179 {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
180 {OutputSpec{{"pair"}, "TST", "PAIR", 0, Lifetime::Timeframe},
181 OutputSpec{{"sequence"}, "TST", "SEQUENCE", 0, Lifetime::Timeframe},
182 OutputSpec{{"allocator"}, "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
183 AlgorithmSpec{adaptStateless(producerCallback)},
184 {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}});
185
187 // consumer utils used by two processes
188 //
189 using ConsumerCounters = std::map<std::string, int>;
190 auto inputChecker = [](InputRecord& inputs, ConsumerCounters& counters) {
191 size_t nSequencePayloads = 0;
192 size_t expectedPayloads = 0;
193 size_t iteration = 0;
194 ConsumerCounters active;
195 for (auto const& ref : InputRecordWalker(inputs)) {
196 if (!inputs.isValid(ref.spec->binding)) {
197 continue;
198 }
199 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
200 ASSERT_ERROR(dh != nullptr)
201 if (!dh) {
202 continue;
203 }
204 active[ref.spec->binding] = 1;
205 if (ref.spec->binding == "sequencein") {
206 auto const* sd = DataRefUtils::getHeader<test::SequenceDesc*>(ref);
207 ASSERT_ERROR(sd != nullptr);
208 if (!sd) {
209 continue;
210 }
211 iteration = sd->iteration;
212 if (expectedPayloads == 0) {
213 expectedPayloads = sd->nPayloads;
214 } else {
215 ASSERT_ERROR(expectedPayloads == sd->nPayloads);
216 }
217 ASSERT_ERROR(*reinterpret_cast<size_t const*>(ref.payload) == sd->initialValue + nSequencePayloads);
218 ++nSequencePayloads;
219 }
220 }
221 for (auto const& [channel, count] : active) {
222 ++counters[channel];
223 }
224 };
225
226 auto createCounters = [](RawDeviceService& rds) -> std::shared_ptr<ConsumerCounters> {
227 auto counters = std::make_shared<ConsumerCounters>();
228 ConsumerCounters& c = *counters;
229 for (auto const& channelSpec : rds.spec().inputChannels) {
230 // we would need the input spec here, while in the device spec we have the attributes
231 // of the FairMQ Channels
232 //(*counters)[channelSpec.name] = 0;
233 }
234 return counters;
235 };
236
237 auto checkCounters = [nRolls = attributes->nRolls](std::shared_ptr<ConsumerCounters> const& counters) -> bool {
238 bool sane = true;
239 for (auto const& [channel, count] : *counters) {
240 if (count != nRolls) {
241 LOG(fatal) << "inconsistent event count on input '" << channel << "': " << count << ", expected " << nRolls;
242 sane = false;
243 }
244 }
245 return sane;
246 };
247
249 // the consumer process connects to the producer
250 //
251 auto consumerInit = [createCounters, checkCounters, inputChecker](RawDeviceService& rds, CallbackService& callbacks) {
252 auto counters = createCounters(rds);
253 callbacks.set<CallbackService::Id::Stop>([counters, checkCounters]() {
254 ASSERT_ERROR(checkCounters(counters));
255 });
256 callbacks.set<CallbackService::Id::EndOfStream>([counters, checkCounters](EndOfStreamContext& context) {
257 ASSERT_ERROR(checkCounters(counters));
258 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
259 });
260
261 auto processing = [inputChecker, counters](InputRecord& inputs) {
262 inputChecker(inputs, *counters);
263 };
264
265 return adaptStateless(processing);
266 };
267
268 workflow.emplace_back(DataProcessorSpec{"consumer",
269 {InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe},
270 InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe},
271 InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
272 {},
273 AlgorithmSpec{adaptStateful(consumerInit)}});
274
276 // spectator process which should get the forwarded data
277 //
278 workflow.emplace_back(DataProcessorSpec{"spectator",
279 {InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe},
280 InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe},
281 InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
282 {},
283 AlgorithmSpec{adaptStateful(consumerInit)}});
284
285 return workflow;
286}
default_random_engine gen(dev())
int32_t i
A helper class to iteratate over all parts of all input routes.
uint32_t c
Definition RawData.h:2
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLint GLint GLsizei GLuint * counters
Definition glcorearb.h:3985
const GLdouble * v
Definition glcorearb.h:832
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
FIXME: do not use data model tables.
Helper class which holds commonly used policies.
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:618
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
static constexpr o2::header::HeaderType sHeaderType
constexpr SequenceDesc(size_t i, size_t n, size_t v)
static constexpr o2::header::SerializationMethod sSerializationMethod
size_t nRolls
#define ASSERT_ERROR(condition)
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::random_device rd