18#include <catch_amalgamated.hpp>
20#include <fairmq/Tools.h>
21#include <fairmq/ProgOptions.h>
29template <typename ContainerT, typename std::enable_if<!std::is_same<ContainerT, fair::mq::MessagePtr>::value,
int>
::type = 0>
35 parts.AddPart(std::move(headerMessage));
36 parts.AddPart(std::move(dataMessage));
44template <typename ContainerT, typename std::enable_if<std::is_same<ContainerT, fair::mq::MessagePtr>::value,
int>
::type = 0>
49 auto* dataHeader =
const_cast<DataHeader*
>(o2::header::get<DataHeader*>(inputStack.data()));
54 parts.AddPart(std::move(headerMessage));
55 parts.AddPart(std::move(dataMessage));
60template <
typename I,
typename F>
64 using span = gsl::span<const std::byte>;
65 using SPAN_SIZE_TYPE = span::size_type;
66 using gsl::narrow_cast;
67 for (
auto it = begin; it !=
end; ++it) {
68 std::byte* headerBuffer{
nullptr};
69 SPAN_SIZE_TYPE headerBufferSize{0};
71 headerBuffer =
reinterpret_cast<std::byte*
>((*it)->GetData());
72 headerBufferSize = narrow_cast<SPAN_SIZE_TYPE>((*it)->GetSize());
75 std::byte* dataBuffer{
nullptr};
76 SPAN_SIZE_TYPE dataBufferSize{0};
78 dataBuffer =
reinterpret_cast<std::byte*
>((*it)->GetData());
79 dataBufferSize = narrow_cast<SPAN_SIZE_TYPE>((*it)->GetSize());
83 function(span{headerBuffer, headerBufferSize}, span{dataBuffer, dataBufferSize});
85 return std::move(function);
91auto forEach(fair::mq::Parts& parts, F&& function)
93 if ((parts.Size() % 2) != 0) {
94 throw std::invalid_argument(
95 "number of parts in message not even (n%2 != 0), cannot be considered an O2 compliant message");
98 return forEach(parts.begin(), parts.end(), std::forward<F>(function));
103 size_t session{(size_t)getpid() * 1000 + 0};
104 fair::mq::ProgOptions config;
105 config.SetProperty<std::string>(
"session",
std::to_string(session));
107 auto factoryZMQ = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
108 auto factorySHM = fair::mq::TransportFactory::CreateTransportFactory(
"shmem",
"getMessage_Stack", &config);
109 REQUIRE(factorySHM !=
nullptr);
110 REQUIRE(factoryZMQ !=
nullptr);
111 auto allocZMQ = getTransportAllocator(factoryZMQ.get());
112 REQUIRE(allocZMQ !=
nullptr);
113 auto allocSHM = getTransportAllocator(factorySHM.get());
114 REQUIRE(allocSHM !=
nullptr);
122 REQUIRE(
s1.data() ==
nullptr);
124 auto* h3 = get<NameHeader<0>*>(
message->GetData());
125 REQUIRE(h3 !=
nullptr);
126 REQUIRE(h3->getNameLength() == 9);
127 REQUIRE(0 == std::strcmp(h3->getName(),
"somename"));
128 REQUIRE(
message->GetType() == fair::mq::Transport::ZMQ);
134 REQUIRE(allocZMQ->getNumberOfMessages() == 1);
138 REQUIRE(allocZMQ->getNumberOfMessages() == 0);
139 REQUIRE(allocSHM->getNumberOfMessages() == 0);
140 REQUIRE(
s1.data() ==
nullptr);
142 auto* h3 = get<NameHeader<0>*>(
message->GetData());
143 REQUIRE(h3 !=
nullptr);
144 REQUIRE(h3->getNameLength() == 9);
145 REQUIRE(0 == std::strcmp(h3->getName(),
"somename"));
146 REQUIRE(
message->GetType() == fair::mq::Transport::SHM);
152 size_t session{(size_t)getpid() * 1000 + 1};
153 fair::mq::ProgOptions config;
154 config.SetProperty<std::string>(
"session",
std::to_string(session));
156 auto factoryZMQ = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
158 auto allocZMQ = getTransportAllocator(factoryZMQ.get());
164 auto simpleMessage = factoryZMQ->CreateMessage(10);
167 std::move(simpleMessage));
177 using namespace boost::container::pmr;
179 std::vector<elem, polymorphic_allocator<elem>>
vec(polymorphic_allocator<elem>{allocZMQ});
181 vec.push_back({1, 2});
182 vec.push_back({3, 4});
188 REQUIRE(
vec.size() == 0);
189 REQUIRE(
message[0].GetSize() == sizeofDataHeader);
190 REQUIRE(
message[1].GetSize() == 2 *
sizeof(elem));
196 const int* numbers =
reinterpret_cast<const int*
>(
data.data());
197 sum = numbers[0] + numbers[1] + numbers[2] + numbers[3];
204 factoryZMQ->CreateMessage(10));
207 REQUIRE(
size == sizeofDataHeader + 2 *
sizeof(elem) + sizeofDataHeader + 10);
212 auto dh = get<DataHeader*>(header.data());
217 REQUIRE(checkOK == 2);
std::vector< o2::mid::ColumnData > inputData
float sum(float s, o2::dcs::DataPointValue v)
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat s1
GLint GLint GLsizei GLint GLenum GLenum type
GLuint GLsizei const GLchar * message
constexpr o2::header::DataDescription gDataDescriptionInvalid
constexpr o2::header::DataOrigin gDataOriginInvalid
O2 memory allocators and interfaces related to managing memory via the trasport layer.
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
fair::mq::MemoryResource FairMQMemoryResource
std::string to_string(gsl::span< T, Size > span)
TEST_CASE("getMessage_Stack")
auto forEach(I begin, I end, F &&function)
bool addDataBlock(fair::mq::Parts &parts, o2::header::Stack &&inputStack, ContainerT &&inputData, o2::pmr::FairMQMemoryResource *targetResource=nullptr)
std::vector< o2::ctf::BufferType > vec