79 using EngineT = std::mt19937;
80 using DistributionT = std::uniform_int_distribution<>;
83 DistributionT distrib;
85 std::string channelName;
88 std::random_device
rd;
89 auto attributes = std::make_shared<Attributes>();
90 attributes->nRolls = 4;
91 attributes->gen = std::mt19937(
rd());
92 attributes->distrib = std::uniform_int_distribution<>{1, 20};
94 std::vector<DataProcessorSpec> workflow;
104 auto&
counter = attributes->iteration;
105 auto& channelName = attributes->channelName;
106 auto&
nRolls = attributes->nRolls;
109 if (channelName.empty()) {
111 auto outputRoutes = rds.spec().outputs;
112 for (
auto& route : outputRoutes) {
114 channelName = route.channel;
121 auto transport = device.GetChannel(channelName, 0).Transport();
122 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
124 auto const* dph = DataRefUtils::getHeader<DataProcessingHeader*>(inputs.
get(
"timer"));
127 fair::mq::Parts messages;
128 auto createSequence = [&dph, &sd, &attributes, &transport, &channelAlloc, &messages](
size_t nPayloads,
DataHeader dh) ->
void {
131 dh.payloadSize =
sizeof(size_t);
133 dh.splitPayloadIndex = nPayloads;
134 dh.splitPayloadParts = nPayloads;
135 sd.nPayloads = nPayloads;
136 sd.initialValue = attributes->distrib(attributes->gen);
138 messages.AddPart(std::move(header));
140 for (
size_t i = 0;
i < nPayloads; ++
i) {
141 fair::mq::MessagePtr payload = transport->CreateMessage(dh.payloadSize);
142 *(
reinterpret_cast<size_t*
>(payload->GetData())) = sd.initialValue +
i;
143 messages.AddPart(std::move(payload));
147 auto createPairs = [&dph, &transport, &channelAlloc, &messages](
size_t nPayloads,
DataHeader dh) ->
void {
150 dh.payloadSize =
sizeof(size_t);
152 dh.splitPayloadIndex = 0;
153 dh.splitPayloadParts = nPayloads;
154 for (
size_t i = 0;
i < nPayloads; ++
i) {
155 dh.splitPayloadIndex =
i;
157 messages.AddPart(std::move(header));
158 fair::mq::MessagePtr payload = transport->CreateMessage(dh.payloadSize);
159 *(
reinterpret_cast<size_t*
>(payload->GetData())) =
i;
160 messages.AddPart(std::move(payload));
164 createSequence(attributes->distrib(attributes->gen),
DataHeader{
"SEQUENCE",
"TST", 0});
173 control.endOfStream();
174 control.readyToQuit(QuitRequest::Me);
179 {
InputSpec{
"timer",
"TST",
"TIMER", 0, Lifetime::Timer}},
180 {
OutputSpec{{
"pair"},
"TST",
"PAIR", 0, Lifetime::Timeframe},
181 OutputSpec{{
"sequence"},
"TST",
"SEQUENCE", 0, Lifetime::Timeframe},
182 OutputSpec{{
"allocator"},
"TST",
"ALLOCATOR", 0, Lifetime::Timeframe}},
184 {
ConfigParamSpec{
"period-timer", VariantType::Int, 100000, {
"period of timer"}}}});
189 using ConsumerCounters = std::map<std::string, int>;
191 size_t nSequencePayloads = 0;
192 size_t expectedPayloads = 0;
193 size_t iteration = 0;
199 auto const* dh = DataRefUtils::getHeader<DataHeader*>(
ref);
205 if (
ref.spec->binding ==
"sequencein") {
206 auto const* sd = DataRefUtils::getHeader<test::SequenceDesc*>(
ref);
211 iteration = sd->iteration;
212 if (expectedPayloads == 0) {
213 expectedPayloads = sd->nPayloads;
217 ASSERT_ERROR(*
reinterpret_cast<size_t const*
>(
ref.payload) == sd->initialValue + nSequencePayloads);
226 auto createCounters = [](
RawDeviceService& rds) -> std::shared_ptr<ConsumerCounters> {
227 auto counters = std::make_shared<ConsumerCounters>();
229 for (
auto const& channelSpec : rds.spec().inputChannels) {
237 auto checkCounters = [
nRolls = attributes->nRolls](std::shared_ptr<ConsumerCounters>
const&
counters) ->
bool {
241 LOG(fatal) <<
"inconsistent event count on input '" << channel <<
"': " <<
count <<
", expected " <<
nRolls;
252 auto counters = createCounters(rds);
253 callbacks.set<CallbackService::Id::Stop>([
counters, checkCounters]() {
258 context.services().get<
ControlService>().readyToQuit(QuitRequest::Me);
269 {
InputSpec{
"pairin",
"TST",
"PAIR", 0, Lifetime::Timeframe},
270 InputSpec{
"sequencein",
"TST",
"SEQUENCE", 0, Lifetime::Timeframe},
271 InputSpec{
"dpldefault",
"TST",
"ALLOCATOR", 0, Lifetime::Timeframe}},
279 {
InputSpec{
"pairin",
"TST",
"PAIR", 0, Lifetime::Timeframe},
280 InputSpec{
"sequencein",
"TST",
"SEQUENCE", 0, Lifetime::Timeframe},
281 InputSpec{
"dpldefault",
"TST",
"ALLOCATOR", 0, Lifetime::Timeframe}},