Project
Loading...
Searching...
No Matches
test_FairMQ.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Headers/NameHeader.h"
13
15#include "Headers/DataHeader.h"
16#include "Headers/Stack.h"
17
18#include <catch_amalgamated.hpp>
19#include <vector>
20#include <fairmq/Tools.h>
21#include <fairmq/ProgOptions.h>
22#include <gsl/gsl>
23
24using namespace o2::header;
25using namespace o2::pmr;
26
27//__________________________________________________________________________________________________
28// addDataBlock for generic (compatible) containers, that is contiguous containers using the pmr allocator
29template <typename ContainerT, typename std::enable_if<!std::is_same<ContainerT, fair::mq::MessagePtr>::value, int>::type = 0>
30bool addDataBlock(fair::mq::Parts& parts, o2::header::Stack&& inputStack, ContainerT&& inputData, o2::pmr::FairMQMemoryResource* targetResource = nullptr)
31{
32 auto headerMessage = o2::pmr::getMessage(std::move(inputStack), targetResource);
33 auto dataMessage = o2::pmr::getMessage(std::forward<ContainerT>(inputData), targetResource);
34
35 parts.AddPart(std::move(headerMessage));
36 parts.AddPart(std::move(dataMessage));
37
38 return true;
39}
40
41//__________________________________________________________________________________________________
42// addDataBlock for data already wrapped in fair::mq::MessagePtr
43// note: since we cannot partially specialize function templates, use SFINAE here instead
44template <typename ContainerT, typename std::enable_if<std::is_same<ContainerT, fair::mq::MessagePtr>::value, int>::type = 0>
45bool addDataBlock(fair::mq::Parts& parts, o2::header::Stack&& inputStack, ContainerT&& dataMessage, o2::pmr::FairMQMemoryResource* targetResource = nullptr)
46{
47 // make sure the payload size in DataHeader corresponds to message size
49 auto* dataHeader = const_cast<DataHeader*>(o2::header::get<DataHeader*>(inputStack.data()));
50 dataHeader->payloadSize = dataMessage->GetSize();
51
52 auto headerMessage = o2::pmr::getMessage(std::move(inputStack), targetResource);
53
54 parts.AddPart(std::move(headerMessage));
55 parts.AddPart(std::move(dataMessage));
56
57 return true;
58}
59
60template <typename I, typename F>
61auto forEach(I begin, I end, F&& function)
62{
63
64 using span = gsl::span<const std::byte>;
65 using SPAN_SIZE_TYPE = span::size_type;
66 using gsl::narrow_cast;
67 for (auto it = begin; it != end; ++it) {
68 std::byte* headerBuffer{nullptr};
69 SPAN_SIZE_TYPE headerBufferSize{0};
70 if (*it != nullptr) {
71 headerBuffer = reinterpret_cast<std::byte*>((*it)->GetData());
72 headerBufferSize = narrow_cast<SPAN_SIZE_TYPE>((*it)->GetSize());
73 }
74 ++it;
75 std::byte* dataBuffer{nullptr};
76 SPAN_SIZE_TYPE dataBufferSize{0};
77 if (*it != nullptr) {
78 dataBuffer = reinterpret_cast<std::byte*>((*it)->GetData());
79 dataBufferSize = narrow_cast<SPAN_SIZE_TYPE>((*it)->GetSize());
80 }
81
82 // call the user provided function
83 function(span{headerBuffer, headerBufferSize}, span{dataBuffer, dataBufferSize});
84 }
85 return std::move(function);
86}
87
90template <typename F>
91auto forEach(fair::mq::Parts& parts, F&& function)
92{
93 if ((parts.Size() % 2) != 0) {
94 throw std::invalid_argument(
95 "number of parts in message not even (n%2 != 0), cannot be considered an O2 compliant message");
96 }
97
98 return forEach(parts.begin(), parts.end(), std::forward<F>(function));
99}
100
101TEST_CASE("getMessage_Stack")
102{
103 size_t session{(size_t)getpid() * 1000 + 0};
104 fair::mq::ProgOptions config;
105 config.SetProperty<std::string>("session", std::to_string(session));
106
107 auto factoryZMQ = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
108 auto factorySHM = fair::mq::TransportFactory::CreateTransportFactory("shmem", "getMessage_Stack", &config);
109 REQUIRE(factorySHM != nullptr);
110 REQUIRE(factoryZMQ != nullptr);
111 auto allocZMQ = getTransportAllocator(factoryZMQ.get());
112 REQUIRE(allocZMQ != nullptr);
113 auto allocSHM = getTransportAllocator(factorySHM.get());
114 REQUIRE(allocSHM != nullptr);
115 {
116 // check that a message is constructed properly with the default new_delete_resource
118 NameHeader<9>{"somename"}};
119
120 auto message = o2::pmr::getMessage(std::move(s1), allocZMQ);
121
122 REQUIRE(s1.data() == nullptr);
123 REQUIRE(message != nullptr);
124 auto* h3 = get<NameHeader<0>*>(message->GetData());
125 REQUIRE(h3 != nullptr);
126 REQUIRE(h3->getNameLength() == 9);
127 REQUIRE(0 == std::strcmp(h3->getName(), "somename"));
128 REQUIRE(message->GetType() == fair::mq::Transport::ZMQ);
129 }
130 {
131 // check that a message is constructed properly, cross resource
133 NameHeader<9>{"somename"}};
134 REQUIRE(allocZMQ->getNumberOfMessages() == 1);
135
136 auto message = o2::pmr::getMessage(std::move(s1), allocSHM);
137
138 REQUIRE(allocZMQ->getNumberOfMessages() == 0);
139 REQUIRE(allocSHM->getNumberOfMessages() == 0);
140 REQUIRE(s1.data() == nullptr);
141 REQUIRE(message != nullptr);
142 auto* h3 = get<NameHeader<0>*>(message->GetData());
143 REQUIRE(h3 != nullptr);
144 REQUIRE(h3->getNameLength() == 9);
145 REQUIRE(0 == std::strcmp(h3->getName(), "somename"));
146 REQUIRE(message->GetType() == fair::mq::Transport::SHM);
147 }
148}
149
150TEST_CASE("addDataBlockForEach_test")
151{
152 size_t session{(size_t)getpid() * 1000 + 1};
153 fair::mq::ProgOptions config;
154 config.SetProperty<std::string>("session", std::to_string(session));
155
156 auto factoryZMQ = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
157 REQUIRE(factoryZMQ);
158 auto allocZMQ = getTransportAllocator(factoryZMQ.get());
159 REQUIRE(allocZMQ);
160
161 {
162 // simple addition of a data block from an exisiting message
163 fair::mq::Parts message;
164 auto simpleMessage = factoryZMQ->CreateMessage(10);
167 std::move(simpleMessage));
168 REQUIRE(message.Size() == 2);
169 }
170
171 {
172 int sizeofDataHeader = sizeof(o2::header::DataHeader);
173 struct elem {
174 int i;
175 int j;
176 };
177 using namespace boost::container::pmr;
178 fair::mq::Parts message;
179 std::vector<elem, polymorphic_allocator<elem>> vec(polymorphic_allocator<elem>{allocZMQ});
180 vec.reserve(100);
181 vec.push_back({1, 2});
182 vec.push_back({3, 4});
183
186 std::move(vec));
187 REQUIRE(message.Size() == 2);
188 REQUIRE(vec.size() == 0);
189 REQUIRE(message[0].GetSize() == sizeofDataHeader);
190 REQUIRE(message[1].GetSize() == 2 * sizeof(elem));
191 ; // check the size of the buffer is set correctly
192
193 // check contents
194 int sum{0};
195 forEach(message, [&](auto header, auto data) {
196 const int* numbers = reinterpret_cast<const int*>(data.data());
197 sum = numbers[0] + numbers[1] + numbers[2] + numbers[3];
198 });
199 REQUIRE(sum == 10);
200
201 // add one more data block and check total size using forEach;
204 factoryZMQ->CreateMessage(10));
205 int size{0};
206 forEach(message, [&](auto header, auto data) { size += header.size() + data.size(); });
207 REQUIRE(size == sizeofDataHeader + 2 * sizeof(elem) + sizeofDataHeader + 10);
208
209 // check contents (headers)
210 int checkOK{0};
211 forEach(message, [&](auto header, auto data) {
212 auto dh = get<DataHeader*>(header.data());
213 if (dh->dataDescription == gDataDescriptionInvalid && dh->dataOrigin == gDataOriginInvalid) {
214 checkOK++;
215 };
216 });
217 REQUIRE(checkOK == 2);
218 }
219}
int32_t i
uint32_t j
Definition RawData.h:0
std::vector< o2::mid::ColumnData > inputData
float sum(float s, o2::dcs::DataPointValue v)
Definition dcs-ccdb.cxx:39
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat s1
Definition glcorearb.h:5034
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
constexpr o2::header::DataDescription gDataDescriptionInvalid
Definition DataHeader.h:596
constexpr o2::header::DataOrigin gDataOriginInvalid
Definition DataHeader.h:561
O2 data header classes and API, v0.1.
Definition DetID.h:49
O2 memory allocators and interfaces related to managing memory via the trasport layer.
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
the main header struct
Definition DataHeader.h:618
PayloadSizeType payloadSize
Definition DataHeader.h:666
uint32_t SubSpecificationType
Definition DataHeader.h:620
an example data header containing a name of an object as a null terminated char arr....
Definition NameHeader.h:39
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
TEST_CASE("getMessage_Stack")
auto forEach(I begin, I end, F &&function)
bool addDataBlock(fair::mq::Parts &parts, o2::header::Stack &&inputStack, ContainerT &&inputData, o2::pmr::FairMQMemoryResource *targetResource=nullptr)
std::vector< o2::ctf::BufferType > vec