Project
Loading...
Searching...
No Matches
test_DataAllocator.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
19#include "Framework/InputSpec.h"
27#include "Headers/DataHeader.h"
28#include "TestClasses.h"
29#include "Framework/Logger.h"
30#include <fairmq/Device.h>
31#include <vector>
32#include <chrono>
33#include <cstring>
34#include <deque>
35#include <utility> // std::declval
36#include <TNamed.h>
37
38using namespace o2::framework;
39
40#define ASSERT_ERROR(condition) \
41 if ((condition) == false) { \
42 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
43 }
44
45// this function is only used to do the static checks for API return types
47{
48 const Output output{"TST", "DUMMY", 0};
49 // we require references to objects owned by allocator context
50 static_assert(std::is_lvalue_reference<decltype(std::declval<DataAllocator>().make<int>(output))>::value);
51 static_assert(std::is_lvalue_reference<decltype(std::declval<DataAllocator>().make<std::string>(output, "test"))>::value);
52 static_assert(std::is_lvalue_reference<decltype(std::declval<DataAllocator>().make<std::vector<int>>(output))>::value);
53}
54
55namespace test
56{
58 // Required to do the lookup
60 static const uint32_t sVersion = 1;
61
62 MetaHeader(uint32_t v)
63 : BaseHeader(sizeof(MetaHeader), sHeaderType, o2::header::gSerializationMethodNone, sVersion), secret(v)
64 {
65 }
66
67 uint64_t secret;
68};
70} // namespace test
71
73{
74 static_assert(enable_root_serialization<o2::test::Polymorphic>::value, "enable_root_serialization<o2::test::Polymorphic> must be true");
75 auto processingFct = [](ProcessingContext& pc) {
76 o2::test::TriviallyCopyable a(42, 23, 0xdead);
78 std::vector<o2::test::Polymorphic> c{{0xaffe}, {0xd00f}};
79 std::vector<o2::test::Base*> ptrVec{new o2::test::Polymorphic{0xaffe}, new o2::test::Polymorphic{0xd00f}};
80 std::deque<int> testDequePayload{10, 20, 30};
81
82 // class TriviallyCopyable is both messageable and has a dictionary, the default
83 // picked by the framework is no serialization
84 test::MetaHeader meta1{42};
85 test::MetaHeader meta2{23};
86 pc.outputs().snapshot(Output{"TST", "MESSAGEABLE", 0, {meta1, meta2}}, a);
87 pc.outputs().snapshot(Output{"TST", "MSGBLEROOTSRLZ", 0},
89 // class Polymorphic is not messageable, so the serialization type is deduced
90 // from the fact that the type has a dictionary and can be ROOT-serialized.
91 pc.outputs().snapshot(Output{"TST", "ROOTNONTOBJECT", 0}, b);
92 // vector of ROOT serializable class
93 pc.outputs().snapshot(Output{"TST", "ROOTVECTOR", 0}, c);
94 // deque of simple types
95 pc.outputs().snapshot(Output{"TST", "DEQUE", 0}, testDequePayload);
96 // likewise, passed anonymously with char type and class name
97 o2::framework::ROOTSerialized<char, const char> d(*((char*)&c), "vector<o2::test::Polymorphic>");
98 pc.outputs().snapshot(Output{"TST", "ROOTSERLZDVEC", 0}, d);
99 // vector of ROOT serializable class wrapped with TClass info as hint
100 auto* cl = TClass::GetClass(typeid(decltype(c)));
101 ASSERT_ERROR(cl != nullptr);
103 pc.outputs().snapshot(Output{"TST", "ROOTSERLZDVEC2", 0}, e);
104 // test the 'make' methods
105 pc.outputs().make<o2::test::TriviallyCopyable>(OutputRef{"makesingle", 0}) = a;
106 auto& multi = pc.outputs().make<o2::test::TriviallyCopyable>(OutputRef{"makespan", 0}, 3);
107 ASSERT_ERROR(multi.size() == 3);
108 for (auto& object : multi) {
109 object = a;
110 }
111 // test the adopt method
112 auto freefct = [](void* data, void* hint) {}; // simply ignore the cleanup for the test
113 static std::string teststring = "adoptchunk";
114 pc.outputs().adoptChunk(Output{"TST", "ADOPTCHUNK", 0}, teststring.data(), teststring.length(), freefct, nullptr);
115 // test resizable data chunk, initial size 0 and grow
116 auto& growchunk = pc.outputs().newChunk(OutputRef{"growchunk", 0}, 0);
117 growchunk.resize(sizeof(o2::test::TriviallyCopyable));
118 memcpy(growchunk.data(), &a, sizeof(o2::test::TriviallyCopyable));
119 // test resizable data chunk, large initial size and shrink
120 auto& shrinkchunk = pc.outputs().newChunk(OutputRef{"shrinkchunk", 0}, 1000000);
121 shrinkchunk.resize(sizeof(o2::test::TriviallyCopyable));
122 memcpy(shrinkchunk.data(), &a, sizeof(o2::test::TriviallyCopyable));
123 // make Root-serializable object derived from TObject
124 auto& rootobject = pc.outputs().make<TNamed>(OutputRef{"maketobject", 0}, "a_name", "a_title");
125 // make Root-serializable object Non-TObject
126 auto& rootpolymorphic = pc.outputs().make<o2::test::Polymorphic>(OutputRef{"makerootserlzblobj", 0}, b);
127 // make vector of Root-serializable objects
128 auto& rootserlzblvector = pc.outputs().make<std::vector<o2::test::Polymorphic>>(OutputRef{"rootserlzblvector", 0});
129 rootserlzblvector.emplace_back(0xacdc);
130 rootserlzblvector.emplace_back(0xbeef);
131 // make vector of messagable objects
132 auto& messageablevector = pc.outputs().make<std::vector<o2::test::TriviallyCopyable>>(OutputRef{"messageablevector", 0});
133 ASSERT_ERROR(messageablevector.size() == 0);
134 messageablevector.push_back(a);
135 messageablevector.emplace_back(10, 20, 0xacdc);
136
137 // create multiple parts matching the same output spec with subspec wildcard
138 // we are using ConcreteDataTypeMatcher to define the output spec matcher independent
139 // of subspec (i.a. a wildcard), all data blcks will go on the same channel regardless
140 // of sebspec.
141 pc.outputs().make<int>(OutputRef{"multiparts", 0}) = 10;
142 pc.outputs().make<int>(OutputRef{"multiparts", 1}) = 20;
143 pc.outputs().make<int>(OutputRef{"multiparts", 2}) = 30;
144
145 // make a PMR std::vector, make it large to test the auto transport buffer resize funtionality as well
146 Output pmrOutputSpec{"TST", "PMRTESTVECTOR", 0};
147 auto pmrvec = o2::pmr::vector<o2::test::TriviallyCopyable>(pc.outputs().getMemoryResource(pmrOutputSpec));
148 pmrvec.reserve(100);
149 pmrvec.emplace_back(o2::test::TriviallyCopyable{1, 2, 3});
150 pc.outputs().adoptContainer(pmrOutputSpec, std::move(pmrvec));
151
152 // make a vector of POD and set some data
153 pc.outputs().make<std::vector<int>>(OutputRef{"podvector"}) = {10, 21, 42};
154
155 // vector of pointers to ROOT serializable objects
156 pc.outputs().snapshot(Output{"TST", "ROOTSERLZDPTRVEC", 0}, ptrVec);
157
158 // now we are done and signal this downstream
159 pc.services().get<ControlService>().endOfStream();
160 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
161
162 ASSERT_ERROR(pc.outputs().isAllowed({"TST", "MESSAGEABLE", 0}) == true);
163 ASSERT_ERROR(pc.outputs().isAllowed({"TST", "MESSAGEABLE", 1}) == false);
164 ASSERT_ERROR(pc.outputs().isAllowed({"TST", "NOWAY", 0}) == false);
165 for (auto ptr : ptrVec) {
166 delete ptr;
167 }
168 };
169
170 return DataProcessorSpec{"source", // name of the processor
171 {},
172 {OutputSpec{"TST", "MESSAGEABLE", 0, Lifetime::Timeframe},
173 OutputSpec{{"makesingle"}, "TST", "MAKESINGLE", 0, Lifetime::Timeframe},
174 OutputSpec{{"makespan"}, "TST", "MAKESPAN", 0, Lifetime::Timeframe},
175 OutputSpec{{"growchunk"}, "TST", "GROWCHUNK", 0, Lifetime::Timeframe},
176 OutputSpec{{"shrinkchunk"}, "TST", "SHRINKCHUNK", 0, Lifetime::Timeframe},
177 OutputSpec{{"maketobject"}, "TST", "MAKETOBJECT", 0, Lifetime::Timeframe},
178 OutputSpec{{"makerootserlzblobj"}, "TST", "ROOTSERLZBLOBJ", 0, Lifetime::Timeframe},
179 OutputSpec{{"rootserlzblvector"}, "TST", "ROOTSERLZBLVECT", 0, Lifetime::Timeframe},
180 OutputSpec{{"messageablevector"}, "TST", "MSGABLVECTOR", 0, Lifetime::Timeframe},
181 OutputSpec{{"multiparts"}, "TST", "MULTIPARTS", 0, Lifetime::Timeframe},
182 OutputSpec{{"multiparts"}, "TST", "MULTIPARTS", 1, Lifetime::Timeframe},
183 OutputSpec{{"multiparts"}, "TST", "MULTIPARTS", 2, Lifetime::Timeframe},
184 OutputSpec{"TST", "ADOPTCHUNK", 0, Lifetime::Timeframe},
185 OutputSpec{"TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe},
186 OutputSpec{"TST", "ROOTNONTOBJECT", 0, Lifetime::Timeframe},
187 OutputSpec{"TST", "ROOTVECTOR", 0, Lifetime::Timeframe},
188 OutputSpec{"TST", "DEQUE", 0, Lifetime::Timeframe},
189 OutputSpec{"TST", "ROOTSERLZDVEC", 0, Lifetime::Timeframe},
190 OutputSpec{"TST", "ROOTSERLZDVEC2", 0, Lifetime::Timeframe},
191 OutputSpec{"TST", "PMRTESTVECTOR", 0, Lifetime::Timeframe},
192 OutputSpec{{"podvector"}, "TST", "PODVECTOR", 0, Lifetime::Timeframe},
193 OutputSpec{{"inputPtrVec"}, "TST", "ROOTSERLZDPTRVEC", 0, Lifetime::Timeframe}},
194 AlgorithmSpec(processingFct)};
195}
196
198{
199 auto processingFct = [](ProcessingContext& pc) {
201 for (auto iit = pc.inputs().begin(), iend = pc.inputs().end(); iit != iend; ++iit) {
202 auto const& input = *iit;
203 LOG(info) << (*iit).spec->binding << " " << (iit.isValid() ? "is valid" : "is not valid");
204 if (iit.isValid() == false) {
205 continue;
206 }
207 auto* dh = DataRefUtils::getHeader<const DataHeader*>(input);
208 LOG(info) << "{" << dh->dataOrigin.str << ":" << dh->dataDescription.str << ":" << dh->subSpecification << "}"
209 << " payload size " << dh->payloadSize;
210
211 using DumpStackFctType = std::function<void(const o2::header::BaseHeader*)>;
212 DumpStackFctType dumpStack = [&](const o2::header::BaseHeader* h) {
213 o2::header::hexDump("", h, h->size());
214 if (h->flagsNextHeader) {
215 auto next = reinterpret_cast<const std::byte*>(h) + h->size();
216 dumpStack(reinterpret_cast<const o2::header::BaseHeader*>(next));
217 }
218 };
219
220 dumpStack(dh);
221
222 if ((*iit).spec->binding == "inputMP") {
223 LOG(info) << "inputMP with " << iit.size() << " part(s)";
224 int nPart = 0;
225 for (auto const& ref : iit) {
226 LOG(info) << "accessing part " << nPart++ << " of input slot 'inputMP':"
227 << pc.inputs().get<int>(ref);
228 ASSERT_ERROR(pc.inputs().get<int>(ref) == nPart * 10);
229 }
230 ASSERT_ERROR(nPart == 3);
231 }
232 }
233 // plain, unserialized object in input1 channel
234 LOG(info) << "extracting o2::test::TriviallyCopyable from input1";
235 auto object1 = pc.inputs().get<o2::test::TriviallyCopyable>("input1");
236 ASSERT_ERROR(object1 == o2::test::TriviallyCopyable(42, 23, 0xdead));
237 LOG(info) << "extracting span of o2::test::TriviallyCopyable from input1";
238 auto object1span = pc.inputs().get<gsl::span<o2::test::TriviallyCopyable>>("input1");
239 ASSERT_ERROR(object1span.size() == 1);
240 ASSERT_ERROR(sizeof(typename decltype(object1span)::value_type) == sizeof(o2::test::TriviallyCopyable));
241 // check the additional header on the stack
242 auto* metaHeader1 = DataRefUtils::getHeader<test::MetaHeader*>(pc.inputs().get("input1"));
243 // check if there are more of the same type
244 auto* metaHeader2 = metaHeader1 ? o2::header::get<test::MetaHeader*>(metaHeader1->next()) : nullptr;
245 ASSERT_ERROR(metaHeader1 != nullptr);
246 ASSERT_ERROR(metaHeader1->secret == 42);
247 ASSERT_ERROR(metaHeader2 != nullptr && metaHeader2->secret == 23);
248
249 // ROOT-serialized messageable object in input2 channel
250 LOG(info) << "extracting o2::test::TriviallyCopyable pointer from input2";
251 auto object2 = pc.inputs().get<o2::test::TriviallyCopyable*>("input2");
252 ASSERT_ERROR(object2 != nullptr);
253 ASSERT_ERROR(*object2 == o2::test::TriviallyCopyable(42, 23, 0xdead));
254
255 // ROOT-serialized, non-messageable object in input3 channel
256 LOG(info) << "extracting o2::test::Polymorphic pointer from input3";
257 auto object3 = pc.inputs().get<o2::test::Polymorphic*>("input3");
258 ASSERT_ERROR(object3 != nullptr);
259 ASSERT_ERROR(*object3 == o2::test::Polymorphic(0xbeef));
260
261 // container of objects
262 LOG(info) << "extracting vector of o2::test::Polymorphic from input4";
263 auto object4 = pc.inputs().get<std::vector<o2::test::Polymorphic>>("input4");
264 ASSERT_ERROR(object4.size() == 2);
265 ASSERT_ERROR(object4[0] == o2::test::Polymorphic(0xaffe));
266 ASSERT_ERROR(object4[1] == o2::test::Polymorphic(0xd00f));
267
268 // container of objects
269 LOG(info) << "extracting vector of o2::test::Polymorphic from input5";
270 auto object5 = pc.inputs().get<std::vector<o2::test::Polymorphic>>("input5");
271 ASSERT_ERROR(object5.size() == 2);
272 ASSERT_ERROR(object5[0] == o2::test::Polymorphic(0xaffe));
273 ASSERT_ERROR(object5[1] == o2::test::Polymorphic(0xd00f));
274
275 // container of objects
276 LOG(info) << "extracting vector of o2::test::Polymorphic from input6";
277 auto object6 = pc.inputs().get<std::vector<o2::test::Polymorphic>>("input6");
278 ASSERT_ERROR(object6.size() == 2);
279 ASSERT_ERROR(object6[0] == o2::test::Polymorphic(0xaffe));
280 ASSERT_ERROR(object6[1] == o2::test::Polymorphic(0xd00f));
281
282 // checking retrieving buffer as raw char*, and checking content by cast
283 LOG(info) << "extracting raw char* from input1";
284 auto rawchar = pc.inputs().get<const char*>("input1");
285 const auto& data1 = *reinterpret_cast<const o2::test::TriviallyCopyable*>(rawchar);
286 ASSERT_ERROR(data1 == o2::test::TriviallyCopyable(42, 23, 0xdead));
287
288 LOG(info) << "extracting o2::test::TriviallyCopyable from input7";
289 auto object7 = pc.inputs().get<o2::test::TriviallyCopyable>("input7");
290 ASSERT_ERROR(object1 == o2::test::TriviallyCopyable(42, 23, 0xdead));
291
292 LOG(info) << "extracting span of o2::test::TriviallyCopyable from input8";
293 auto objectspan8 = DataRefUtils::as<o2::test::TriviallyCopyable>(pc.inputs().get("input8"));
294 ASSERT_ERROR(objectspan8.size() == 3);
295 for (auto const& object8 : objectspan8) {
296 ASSERT_ERROR(object8 == o2::test::TriviallyCopyable(42, 23, 0xdead));
297 }
298
299 LOG(info) << "extracting std::string from input9";
300 auto object9 = pc.inputs().get<std::string>("input9");
301 ASSERT_ERROR(object9 == "adoptchunk");
302
303 LOG(info) << "extracting o2::test::TriviallyCopyable from input10";
304 auto object10 = pc.inputs().get<o2::test::TriviallyCopyable>("input10");
305 ASSERT_ERROR(object10 == o2::test::TriviallyCopyable(42, 23, 0xdead));
306
307 LOG(info) << "extracting o2::test::TriviallyCopyable from input11";
308 auto object11 = pc.inputs().get<o2::test::TriviallyCopyable>("input11");
309 ASSERT_ERROR(object11 == o2::test::TriviallyCopyable(42, 23, 0xdead));
310
311 LOG(info) << "extracting the original std::vector<o2::test::TriviallyCopyable> as span from input12";
312 auto object12 = pc.inputs().get<gsl::span<o2::test::TriviallyCopyable>>("input12");
313 ASSERT_ERROR(object12.size() == 2);
314 ASSERT_ERROR((object12[0] == o2::test::TriviallyCopyable{42, 23, 0xdead}));
315 ASSERT_ERROR((object12[1] == o2::test::TriviallyCopyable{10, 20, 0xacdc}));
316 // forward the read-only span on a different route
317 pc.outputs().snapshot(Output{"TST", "MSGABLVECTORCPY", 0}, object12);
318
319 LOG(info) << "extracting TNamed object from input13";
320 auto object13 = pc.inputs().get<TNamed*>("input13");
321 ASSERT_ERROR(strcmp(object13->GetName(), "a_name") == 0);
322 ASSERT_ERROR(strcmp(object13->GetTitle(), "a_title") == 0);
323
324 LOG(info) << "extracting Root-serialized Non-TObject from input14";
325 auto object14 = pc.inputs().get<o2::test::Polymorphic*>("input14");
326 ASSERT_ERROR(*object14 == o2::test::Polymorphic{0xbeef});
327
328 LOG(info) << "extracting Root-serialized vector from input15";
329 auto object15 = pc.inputs().get<std::vector<o2::test::Polymorphic>>("input15");
330 ASSERT_ERROR(object15[0] == o2::test::Polymorphic{0xacdc});
331 ASSERT_ERROR(object15[1] == o2::test::Polymorphic{0xbeef});
332
333 LOG(info) << "extracting deque to vector from input16";
334 auto object16 = pc.inputs().get<std::vector<int>>("input16");
335 LOG(info) << "object16.size() = " << object16.size() << std::endl;
336 ASSERT_ERROR(object16.size() == 3);
337 ASSERT_ERROR(object16[0] == 10 && object16[1] == 20 && object16[2] == 30);
338
339 LOG(info) << "extracting PMR vector";
340 auto pmrspan = pc.inputs().get<gsl::span<o2::test::TriviallyCopyable>>("inputPMR");
341 ASSERT_ERROR((pmrspan[0] == o2::test::TriviallyCopyable{1, 2, 3}));
342 auto dataref = pc.inputs().get<DataRef>("inputPMR");
343 auto header = DataRefUtils::getHeader<const o2::header::DataHeader*>(dataref);
345
346 LOG(info) << "extracting POD vector";
347 // TODO: use the ReturnType helper once implemented
348 decltype(std::declval<InputRecord>().get<std::vector<int>>(DataRef{nullptr, nullptr, nullptr})) podvector;
349 podvector = pc.inputs().get<std::vector<int>>("inputPODvector");
350 ASSERT_ERROR(podvector.size() == 3);
351 ASSERT_ERROR(podvector[0] == 10 && podvector[1] == 21 && podvector[2] == 42);
352
353 LOG(info) << "extracting vector of o2::test::Base* from inputPtrVec";
354 auto ptrVec = pc.inputs().get<std::vector<o2::test::Base*>>("inputPtrVec");
355 ASSERT_ERROR(ptrVec.size() == 2);
356 auto ptrVec0 = dynamic_cast<o2::test::Polymorphic*>(ptrVec[0]);
357 auto ptrVec1 = dynamic_cast<o2::test::Polymorphic*>(ptrVec[1]);
358 ASSERT_ERROR(ptrVec0 != nullptr);
359 ASSERT_ERROR(ptrVec1 != nullptr);
360 ASSERT_ERROR(*ptrVec0 == o2::test::Polymorphic(0xaffe));
361 ASSERT_ERROR(*ptrVec1 == o2::test::Polymorphic(0xd00f));
362 delete ptrVec[0];
363 delete ptrVec[1];
364
365 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
366 };
367
368 return DataProcessorSpec{"sink", // name of the processor
369 {InputSpec{"input1", "TST", "MESSAGEABLE", 0, Lifetime::Timeframe},
370 InputSpec{"input2", "TST", "MSGBLEROOTSRLZ", 0, Lifetime::Timeframe},
371 InputSpec{"input3", "TST", "ROOTNONTOBJECT", 0, Lifetime::Timeframe},
372 InputSpec{"input4", "TST", "ROOTVECTOR", 0, Lifetime::Timeframe},
373 InputSpec{"input5", "TST", "ROOTSERLZDVEC", 0, Lifetime::Timeframe},
374 InputSpec{"input6", "TST", "ROOTSERLZDVEC2", 0, Lifetime::Timeframe},
375 InputSpec{"input7", "TST", "MAKESINGLE", 0, Lifetime::Timeframe},
376 InputSpec{"input8", "TST", "MAKESPAN", 0, Lifetime::Timeframe},
377 InputSpec{"input9", "TST", "ADOPTCHUNK", 0, Lifetime::Timeframe},
378 InputSpec{"input10", "TST", "GROWCHUNK", 0, Lifetime::Timeframe},
379 InputSpec{"input11", "TST", "SHRINKCHUNK", 0, Lifetime::Timeframe},
380 InputSpec{"input12", "TST", "MSGABLVECTOR", 0, Lifetime::Timeframe},
381 InputSpec{"input13", "TST", "MAKETOBJECT", 0, Lifetime::Timeframe},
382 InputSpec{"input14", "TST", "ROOTSERLZBLOBJ", 0, Lifetime::Timeframe},
383 InputSpec{"input15", "TST", "ROOTSERLZBLVECT", 0, Lifetime::Timeframe},
384 InputSpec{"input16", "TST", "DEQUE", 0, Lifetime::Timeframe},
385 InputSpec{"inputPMR", "TST", "PMRTESTVECTOR", 0, Lifetime::Timeframe},
386 InputSpec{"inputPODvector", "TST", "PODVECTOR", 0, Lifetime::Timeframe},
387 InputSpec{"inputMP", ConcreteDataTypeMatcher{"TST", "MULTIPARTS"}, Lifetime::Timeframe},
388 InputSpec{"inputPtrVec", "TST", "ROOTSERLZDPTRVEC", 0, Lifetime::Timeframe}},
389 Outputs{OutputSpec{"TST", "MSGABLVECTORCPY", 0, Lifetime::Timeframe}},
390 AlgorithmSpec(processingFct)};
391}
392
393// a second spec subscribing to some of the same data to test forwarding of messages
395{
396 auto processingFct = [](ProcessingContext& pc) {
398 int nPart = 0;
399 for (auto iit = pc.inputs().begin(), iend = pc.inputs().end(); iit != iend; ++iit) {
400 auto const& input = *iit;
401 LOG(info) << (*iit).spec->binding << " " << (iit.isValid() ? "is valid" : "is not valid");
402 if (iit.isValid() == false) {
403 continue;
404 }
405 auto* dh = DataRefUtils::getHeader<const DataHeader*>(input);
406 LOG(info) << "{" << dh->dataOrigin.str << ":" << dh->dataDescription.str << ":" << dh->subSpecification << "}"
407 << " payload size " << dh->payloadSize;
408
409 if ((*iit).spec->binding == "inputMP") {
410 LOG(info) << "inputMP with " << iit.size() << " part(s)";
411 for (auto const& ref : iit) {
412 LOG(info) << "accessing part " << nPart << " of input slot 'inputMP':"
413 << pc.inputs().get<int>(ref);
414 nPart++;
415 ASSERT_ERROR(pc.inputs().get<int>(ref) == nPart * 10);
416 }
417 }
418 }
419 ASSERT_ERROR(nPart == 3);
420 LOG(info) << "extracting the forwarded gsl::span<o2::test::TriviallyCopyable> as span from input12";
421 auto object12 = pc.inputs().get<gsl::span<o2::test::TriviallyCopyable>>("input12");
422 ASSERT_ERROR(object12.size() == 2);
423 ASSERT_ERROR((object12[0] == o2::test::TriviallyCopyable{42, 23, 0xdead}));
424 ASSERT_ERROR((object12[1] == o2::test::TriviallyCopyable{10, 20, 0xacdc}));
425
426 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
427 };
428
429 return DataProcessorSpec{"spectator-sink", // name of the processor
430 {InputSpec{"inputMP", ConcreteDataTypeMatcher{"TST", "MULTIPARTS"}, Lifetime::Timeframe},
431 InputSpec{"input12", ConcreteDataTypeMatcher{"TST", "MSGABLVECTORCPY"}, Lifetime::Timeframe}},
432 Outputs{},
433 AlgorithmSpec(processingFct)};
434}
435
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t c
Definition RawData.h:2
Type wrappers for enfording a specific serialization method.
TBranch * ptr
Class for time synchronization of RawReader instances.
unsigned get() const
Definition TestClasses.h:67
const GLdouble * v
Definition glcorearb.h:832
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< OutputSpec > Outputs
void hexDump(const char *desc, const void *voidaddr, size_t len, size_t max=0)
helper function to print a hex/ASCII dump of some memory
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
FIXME: do not use data model tables.
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:618
static const uint32_t sVersion
static const o2::header::HeaderType sHeaderType
#define ASSERT_ERROR(condition)
DataProcessorSpec getSpectatorSinkSpec()
void doTypeChecks()
DataProcessorSpec getSinkSpec()
DataProcessorSpec getSourceSpec()
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"