Project
Loading...
Searching...
No Matches
test_StaggeringWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
20#include "Framework/InputSpec.h"
24#include "Framework/Logger.h"
27#include "Framework/Output.h"
28#include <cstring>
29#include <regex>
30
31void customize(std::vector<o2::framework::DispatchPolicy>& policies)
32{
33 // we customize all devices to dispatch data immediately
34 auto producerMatcher = [](auto const& spec) {
35 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex("producer.*"));
36 };
37 auto processorMatcher = [](auto const& spec) {
38 return std::regex_match(spec.name.begin(), spec.name.end(), std::regex("processor.*"));
39 };
40 auto triggerMatcher = [](auto const& query) {
41 o2::framework::Output reference{"PROD", "TRIGGER"};
42 return reference.origin == query.origin && reference.description == query.description;
43 };
44 policies.push_back({"producer-policy", producerMatcher, o2::framework::DispatchPolicy::DispatchOp::WhenReady, triggerMatcher});
45 policies.push_back({"processor-policy", processorMatcher, o2::framework::DispatchPolicy::DispatchOp::WhenReady});
46}
47
48void customize(std::vector<o2::framework::CompletionPolicy>& policies)
49{
50 // we customize the processors to consume data as it comes
51 policies.push_back({"processor-consume",
52 [](o2::framework::DeviceSpec const& spec) {
53 // search for spec names starting with "processor"
54 return spec.name.find("processor") == 0;
55 },
56 [](auto const&, auto const&, auto&) { return o2::framework::CompletionPolicy::CompletionOp::Consume; }});
57}
58
60
61using namespace o2::framework;
62
63#define ASSERT_ERROR(condition) \
64 if ((condition) == false) { \
65 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
66 }
67
68constexpr size_t nPipelines = 3;
69constexpr size_t nChannels = 10;
70
71std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
72{
73 // first fill the subspecifications
75 std::vector<o2::header::DataHeader::SubSpecificationType> subspecs(nChannels);
76 std::generate(subspecs.begin(), subspecs.end(), [counter = std::make_shared<int>(0)]() { return (*counter)++; });
77 std::vector<OutputSpec> producerOutputs;
78 for (auto const& subspec : subspecs) {
79 producerOutputs.emplace_back(OutputSpec{"PROD", "CHANNEL", subspec, Lifetime::Timeframe});
80 producerOutputs.emplace_back(OutputSpec{"PROD", "TRIGGER", subspec, Lifetime::Timeframe});
81 }
82
83 auto producerFct = adaptStateless([subspecs](DataAllocator& outputs, RawDeviceService& device, ControlService& control) {
84 for (auto const& subspec : subspecs) {
85 // since the snapshot copy is ready for sending it is scheduled but held back
86 // because of the CompletionPolicy trigger matcher. This message will be
87 // sent together with the second message.
88 outputs.snapshot(Output{"PROD", "CHANNEL", subspec}, subspec);
89 device.waitFor(100);
90 outputs.snapshot(Output{"PROD", "TRIGGER", subspec}, subspec);
91 device.waitFor(100);
92 }
93 control.endOfStream();
94 control.readyToQuit(QuitRequest::Me);
95 });
96
97 auto processorFct = [](ProcessingContext& pc) {
98 int nActiveInputs = 0;
99 LOG(info) << "processing ...";
100 for (auto const& input : pc.inputs()) {
101 if (pc.inputs().isValid(input.spec->binding) == false) {
102 // this input slot is empty
103 continue;
104 }
105 auto& data = pc.inputs().get<MyDataType>(input.spec->binding.c_str());
106 LOG(info) << "processing " << input.spec->binding << " " << data;
107 // check if the channel binding starts with 'trigger'
108 if (input.spec->binding.find("trigger") == 0) {
109 pc.outputs().make<MyDataType>(Output{"PROC", "CHANNEL", data}) = data;
110 }
111 nActiveInputs++;
112 }
113 LOG(info) << "processed " << nActiveInputs << " inputs";
114 // since we publish with delays, and two channels are always sent together
115 ASSERT_ERROR(nActiveInputs == 2);
116 };
117 auto amendSinkInput = [subspecs](InputSpec& input, size_t index) {
118 input.binding += std::to_string(subspecs[index]);
120 };
121
122 auto sinkFct = adaptStateful([](CallbackService& callbacks) {
123 callbacks.set<CallbackService::Id::EndOfStream>([](EndOfStreamContext& context) {
124 context.services().get<ControlService>().readyToQuit(QuitRequest::All);
125 });
126 return adaptStateless([](InputRecord& inputs) {
127 for (auto const& input : inputs) {
128 auto& data = inputs.get<MyDataType>(input.spec->binding.c_str());
129 LOG(info) << "received channel " << data;
130 } });
131 });
132
133 std::vector<DataProcessorSpec> workflow = parallelPipeline(
134 std::vector<DataProcessorSpec>{DataProcessorSpec{
135 "processor",
136 {InputSpec{"input", "PROD", "CHANNEL", 0, Lifetime::Timeframe},
137 InputSpec{"trigger", "PROD", "TRIGGER", 0, Lifetime::Timeframe}},
138 {OutputSpec{"PROC", "CHANNEL", 0, Lifetime::Timeframe}},
139 AlgorithmSpec{processorFct}}},
141 [&subspecs]() { return subspecs.size(); },
142 [&subspecs](size_t index) { return subspecs[index]; });
143
144 workflow.emplace_back(DataProcessorSpec{
145 "producer",
146 Inputs{},
147 producerOutputs,
148 AlgorithmSpec{producerFct}});
149
150 workflow.emplace_back(DataProcessorSpec{
151 "sink",
152 mergeInputs(InputSpec{"input", "PROC", "CHANNEL", 0, Lifetime::Timeframe}, nChannels, amendSinkInput),
153 {},
154 AlgorithmSpec{sinkFct}});
155 return workflow;
156}
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
void snapshot(const Output &spec, T const &object)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
virtual void waitFor(unsigned int time)=0
GLuint index
Definition glcorearb.h:781
GLboolean * data
Definition glcorearb.h:298
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > Inputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
uint32_t SubSpecificationType
Definition DataHeader.h:620
constexpr size_t nPipelines
#define ASSERT_ERROR(condition)
void customize(std::vector< o2::framework::DispatchPolicy > &policies)
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
constexpr size_t nChannels
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"