Project
Loading...
Searching...
No Matches
test_DataDescriptorMatcherWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/InputSpec.h"
22
23// we need to add workflow options before including Framework/runDataProcessing
24//{
25//}
26
28
29// A test workflow for DataDescriptorNegator
30// Create a processor which subscribes to input spec TST/SAMPLE/!0
31// meaning TST/SAMPLE with all but subspec 0
32// Subscribing processor to TST/SAMPLE/0
33
34// Two tasks:
35// - InputSpec matching to OutputSpec: OutputSpec has to options,
36// ConcreteDataMatcher and ConcreteDataTypeMatcher, DataSpecUtils has to
37// methods matching InputSpec matcher to either if this
38// -> this is sufficient for the use case if the negator is implemented
39// in DataDescriptorMatcher
40// - matching of data packets to InputRoutes of the DataRelayer, also this
41// is based on DataDescriptorMatcher
42//
43// DataDescriptorMatcher extension
44// - define Negator
45//
46// InputSpec definition:
47// - refactor to have one constructor with templated parameters
48
49#define ASSERT_ERROR(condition) \
50 if ((condition) == false) { \
51 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
52 }
53
54using namespace o2::framework;
56
57std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
58{
59 std::vector<DataProcessorSpec> workflow;
60
61 auto producerCallback = [counter = std::make_shared<size_t>()](DataAllocator& outputs, ControlService& control) {
62 if (*counter > 0) {
63 // don't know if we enter the processing callback after the EOS was sent
64 return;
65 }
66 outputs.make<unsigned int>(Output{"TST", "SAMPLE", 1}) = 1;
67 outputs.make<unsigned int>(Output{"TST", "SAMPLE", 2}) = 2;
68 ++(*counter);
69 control.endOfStream();
70 };
71
72 workflow.emplace_back(DataProcessorSpec{"producer",
73 {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
74 {OutputSpec{"TST", "SAMPLE", 1, Lifetime::Timeframe},
75 OutputSpec{"TST", "SAMPLE", 2, Lifetime::Timeframe}},
76 AlgorithmSpec{adaptStateless(producerCallback)},
77 {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"timer"}}}});
78
79 auto processorCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs, DataAllocator& outputs) {
80 // should not be called more than one time
81 ASSERT_ERROR(*counter == 0);
82 ++(*counter);
83 int nBlocks = 0;
84 for (auto ref : InputRecordWalker(inputs)) {
85 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
86 ASSERT_ERROR(dh != nullptr);
87 auto const& data = inputs.get<unsigned int>(ref);
88 ASSERT_ERROR(data == dh->subSpecification);
89 outputs.make<unsigned int>(OutputRef{"out", 0}) = data;
90 LOG(info) << fmt::format("forwarded {}/{}/{} with data {}",
91 dh->dataOrigin.as<std::string>(),
92 dh->dataDescription.as<std::string>(),
93 dh->subSpecification,
94 data);
95 ++nBlocks;
96 }
97 ASSERT_ERROR(nBlocks == 2);
98 };
99
101 DataDescriptorMatcher processorInputMatcher = {
102 DataDescriptorMatcher::Op::And,
104 std::make_unique<DataDescriptorMatcher>(
105 DataDescriptorMatcher::Op::And,
107 std::make_unique<DataDescriptorMatcher>(
108 DataDescriptorMatcher::Op::And,
109 std::make_unique<DataDescriptorMatcher>(
110 DataDescriptorMatcher::Op::Not,
112 std::make_unique<DataDescriptorMatcher>(
113 DataDescriptorMatcher::Op::Just,
115
116 workflow.emplace_back(DataProcessorSpec{"processor",
117 {InputSpec{"in", std::move(processorInputMatcher), Lifetime::Timeframe}},
118 {OutputSpec{{"out"}, "TST", "SAMPLE", 0, Lifetime::Timeframe}},
119 AlgorithmSpec{adaptStateless(processorCallback)}});
120
121 auto sinkCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs) {
122 // should not be called more than one time
123 ASSERT_ERROR(*counter == 0);
124 ++(*counter);
125 int nBlocks = 0;
126 for (auto ref : InputRecordWalker(inputs)) {
127 auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
128 ASSERT_ERROR(dh != nullptr);
129 auto const& data = inputs.get<unsigned int>(ref);
130 ASSERT_ERROR(data > 0 && data < 3);
131 LOG(info) << fmt::format("received {}/{}/{} with data {}",
132 dh->dataOrigin.as<std::string>(),
133 dh->dataDescription.as<std::string>(),
134 dh->subSpecification,
135 data);
136 ++nBlocks;
137 }
138 ASSERT_ERROR(nBlocks == 2);
139 };
140
141 workflow.emplace_back(DataProcessorSpec{"sink",
142 {InputSpec{"in", "TST", "SAMPLE", 0, Lifetime::Timeframe}},
143 {},
144 AlgorithmSpec{adaptStateless(sinkCallback)}});
145
146 return workflow;
147}
A helper class to iteratate over all parts of all input routes.
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
Something which can be matched against a header::DataDescription.
Something which can be matched against a header::DataOrigin.
Matcher on actual time, as reported in the DataProcessingHeader.
Something which can be matched against a header::SubSpecificationType.
GLboolean * data
Definition glcorearb.h:298
GLint ref
Definition glcorearb.h:291
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
A typesafe reference to an element of the context.
the main header struct
Definition DataHeader.h:618
#define ASSERT_ERROR(condition)
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"