Project
Loading...
Searching...
No Matches
test_InputRecordWalker.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "Framework/InputSpan.h"
15#include "Framework/WorkflowSpec.h" // o2::framework::select
17#include "Headers/DataHeader.h"
19#include "Headers/Stack.h"
20#include <catch_amalgamated.hpp>
21#include <vector>
22#include <memory>
23#include <string>
24#include <unordered_map>
25
26using namespace o2::framework;
29
30// simple helper struct to keep the InputRecord and ownership of messages
31struct DataSet {
32 // not nice with the double vector but for quick unit test ok
33 using MessageSet = std::vector<std::unique_ptr<std::vector<char>>>;
34 using TaggedSet = std::pair<o2::header::DataOrigin, MessageSet>;
35 using Messages = std::vector<TaggedSet>;
36 using CheckType = std::vector<std::string>;
37 DataSet(std::vector<InputRoute>&& s, Messages&& m, CheckType&& v, ServiceRegistryRef registry)
38 : schema{std::move(s)}, messages{std::move(m)}, span{[this](size_t i) { return i < this->messages.size() ? messages[i].second.size() / 2 : 0; }, nullptr, [this](size_t i, DataRefIndices idx) {
39 auto header = static_cast<char const*>(this->messages[i].second.at(idx.headerIdx)->data());
40 auto payload = static_cast<char const*>(this->messages[i].second.at(idx.payloadIdx)->data());
41 return DataRef{nullptr, header, payload}; }, [this](size_t i, DataRefIndices current) -> DataRefIndices {
42 size_t next = current.headerIdx + 2;
43 return next < this->messages[i].second.size() ? DataRefIndices{next, next + 1} : DataRefIndices{size_t(-1), size_t(-1)}; }, this->messages.size()}, record{schema, span, registry}, values{std::move(v)}
44 {
45 REQUIRE(messages.size() == schema.size());
46 }
47
48 std::vector<InputRoute> schema;
53};
54
56{
57 static ServiceRegistry registry;
58 // Create the routes we want for the InputRecord
59 std::vector<InputSpec> inputspecs = {
60 InputSpec{"tpc", "TPC", "SOMEDATA", 0, Lifetime::Timeframe},
61 InputSpec{"its", ConcreteDataTypeMatcher{"ITS", "SOMEDATA"}, Lifetime::Timeframe},
62 InputSpec{"tof", "TOF", "SOMEDATA", 1, Lifetime::Timeframe}};
63
64 size_t i = 0;
65 auto createRoute = [&i](const char* source, InputSpec& spec) {
66 return InputRoute{
67 spec,
68 i++,
69 source};
70 };
71
72 std::vector<InputRoute> schema = {
73 createRoute("tpc_source", inputspecs[0]),
74 createRoute("its_source", inputspecs[1]),
75 createRoute("tof_source", inputspecs[2])};
76
77 decltype(DataSet::values) checkValues;
78 DataSet::Messages messages;
79
80 auto createMessage = [&messages, &checkValues](DataHeader dh) {
81 checkValues.emplace_back(fmt::format("{}_{}_{}", dh.dataOrigin, dh.dataDescription, dh.subSpecification));
82 std::string const& data = checkValues.back();
83 dh.payloadSize = data.size();
84 DataProcessingHeader dph{0, 1};
85 Stack stack{dh, dph};
86 auto it = messages.begin(), end = messages.end();
87 for (; it != end; ++it) {
88 if (it->first == dh.dataOrigin) {
89 break;
90 }
91 }
92 if (it == end) {
93 messages.resize(messages.size() + 1);
94 it = messages.end() - 1;
95 it->first = dh.dataOrigin;
96 }
97 auto& routemessages = it->second;
98 routemessages.emplace_back(std::make_unique<std::vector<char>>(stack.size()));
99 memcpy(routemessages.back()->data(), stack.data(), routemessages.back()->size());
100 routemessages.emplace_back(std::make_unique<std::vector<char>>(dh.payloadSize));
101 memcpy(routemessages.back()->data(), data.data(), routemessages.back()->size());
102 };
103
104 // we create message for the 3 input routes, the messages have different page size
105 // and the second messages has 3 parts, each with the same page size
106 // the test value is written as payload after the RDH and all values are cached for
107 // later checking when parsing the data set
108 DataHeader dh1;
109 dh1.dataDescription = "SOMEDATA";
110 dh1.dataOrigin = "TPC";
111 dh1.subSpecification = 0;
113 DataHeader dh2;
114 dh2.dataDescription = "SOMEDATA";
115 dh2.dataOrigin = "ITS";
116 dh2.subSpecification = 0;
118 DataHeader dh3;
119 dh3.dataDescription = "SOMEDATA";
120 dh3.dataOrigin = "ITS";
121 dh3.subSpecification = 1;
123 DataHeader dh4;
124 dh4.dataDescription = "SOMEDATA";
125 dh4.dataOrigin = "TOF";
126 dh4.subSpecification = 255;
128 createMessage(dh1);
129 createMessage(dh2);
130 createMessage(dh3);
131 createMessage(dh4);
132
133 return {std::move(schema), std::move(messages), std::move(checkValues), registry};
134}
135
136TEST_CASE("test_DPLRawParser")
137{
138 auto dataset = createData();
139 InputRecord& inputs = dataset.record;
140 REQUIRE(dataset.messages.size() > 0);
141 REQUIRE(dataset.messages[0].second.at(0) != nullptr);
142 REQUIRE(inputs.size() == 3);
143 REQUIRE((*inputs.begin()).header == dataset.messages[0].second.at(0)->data());
144
145 int count = 0;
146 for (auto const& ref : InputRecordWalker(inputs)) {
147 auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
148 auto const data = inputs.get<std::string>(ref);
149 REQUIRE(data == dataset.values[count]);
150 count++;
151 }
152 REQUIRE(count == 4);
153
154 std::vector<InputSpec> filter{
155 {"tpc", "TPC", "SOMEDATA", 0, Lifetime::Timeframe},
156 {"its", ConcreteDataTypeMatcher{"ITS", "SOMEDATA"}, Lifetime::Timeframe},
157 };
158
159 count = 0;
160 for (auto const& ref : InputRecordWalker(inputs, filter)) {
161 auto const data = inputs.get<std::string>(ref);
162 REQUIRE(data == dataset.values[count]);
163 count++;
164 }
165 REQUIRE(count == 3);
166}
std::shared_ptr< arrow::Schema > schema
int32_t i
A helper class to iteratate over all parts of all input routes.
uint32_t stack
Definition RawData.h:1
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
const_iterator begin() const
decltype(auto) get(R binding, int part=0) const
size_t size() const
Number of elements in the InputSpan.
Definition InputSpan.h:93
const GLfloat * m
Definition glcorearb.h:4066
GLint GLsizei count
Definition glcorearb.h:399
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLboolean * data
Definition glcorearb.h:298
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLint ref
Definition glcorearb.h:291
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
TEST_CASE("test_prepareArguments")
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
std::vector< InputRoute > schema
std::vector< std::string > CheckType
std::vector< std::unique_ptr< std::vector< char > > > MessageSet
DataSet(std::vector< InputRoute > &&s, Messages &&m, CheckType &&v, ServiceRegistryRef registry)
std::vector< TaggedSet > Messages
std::pair< o2::header::DataOrigin, MessageSet > TaggedSet
the main header struct
Definition DataHeader.h:620
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:653
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
DataSet createData()