Project
Loading...
Searching...
No Matches
test_SlowProducerWithWildCard.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16#include <fairmq/Device.h>
17
18#include <chrono>
19#include <thread>
20#include <vector>
21
22using namespace o2::framework;
23void customize(std::vector<o2::framework::CompletionPolicy>& policies)
24{
25 policies.push_back(CompletionPolicyHelpers::defineByName("Publisher.*", CompletionPolicy::CompletionOp::Consume));
26}
27
29using namespace o2::framework;
30
31// This is how you can define your processing in a declarative way
33{
34 return WorkflowSpec{
35 {.name = "A1",
36 .outputs = {OutputSpec{{"a"}, "CLP", "D0", 0, Lifetime::Timeframe},
37 OutputSpec{{"b"}, "CLW", "D0", 0, Lifetime::Timeframe}},
38 .algorithm = AlgorithmSpec{adaptStateful([]() { return adaptStateless(
39 [](DataAllocator& outputs, RawDeviceService& device, ControlService& control) {
40 static int count = 0;
41 auto& aData = outputs.make<int>(OutputRef{"a"});
42 auto& bData = outputs.make<int>(OutputRef{"b"});
43 aData = 1;
44 (void)bData;
45 LOG(info) << count++;
46 device.device()->WaitFor(std::chrono::milliseconds(100));
47 if (count > 100) {
48 LOGP(info, "Done sending 100 messages on the fast path");
49 control.endOfStream();
50 control.readyToQuit(QuitRequest::Me);
51 }
52 }); })}},
53 {.name = "A2",
54 .outputs = {OutputSpec{{"a"}, "CLP", "D1", 1, Lifetime::Timeframe},
55 OutputSpec{{"b"}, "CLW", "D1", 1, Lifetime::Timeframe}},
56 .algorithm = AlgorithmSpec{adaptStateful([]() { return adaptStateless(
57 [](DataAllocator& outputs, RawDeviceService& device, ControlService& control) {
58 static int count = 0;
59 auto& aData = outputs.make<int>(OutputRef{"a", 1});
60 auto& bData = outputs.make<int>(OutputRef{"b", 1});
61 aData = 2;
62 (void)bData;
63 LOG(info) << count++;
64 device.device()->WaitFor(std::chrono::milliseconds(1000));
65 if (count > 10) {
66 LOGP(info, "Done sending 10 messages on the slow path");
67 control.endOfStream();
68 control.readyToQuit(QuitRequest::Me);
69 }
70 }); })}},
71 {.name = "Publisher",
72 .inputs = {{"x", "CLP", Lifetime::Sporadic},
73 {"y", "CLW", Lifetime::Sporadic}},
74 .algorithm = AlgorithmSpec{adaptStateful([]() { return adaptStateless(
75 [](InputRecord& inputs, RawDeviceService& device, ControlService& control) {
76 static int a1Count = 0;
77 static int a2Count = 0;
78
79 auto& x = inputs.get<int>("x");
80 if (x == 1) {
81 LOGP(info, "Received from A1 {}", a1Count++);
82 } else if (x == 2) {
83 LOGP(info, "Received from A2 {}", a2Count++);
84 } else {
85 LOGP(fatal, "Unexpected value {}", x);
86 }
87 LOGP(info, "Count is {} {}", a1Count, a2Count);
88 if (a1Count == 101 && a2Count == 11) {
89 LOGP(info, "Done receiving all messages");
90 control.endOfStream();
91 control.readyToQuit(QuitRequest::Me);
92 }
93 }); })}}};
94}
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
virtual fair::mq::Device * device()=0
GLint GLenum GLint x
Definition glcorearb.h:403
GLint GLsizei count
Definition glcorearb.h:399
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"