Project
Loading...
Searching...
No Matches
test_SimpleStatefulProcessing01.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
15#include <Monitoring/Monitoring.h>
18#include "Framework/Logger.h"
19
20using namespace o2::framework;
22
23// This is a simple consumer / producer workflow where both are
24// stateful, i.e. they have context which comes from their initialization.
26{
27 return WorkflowSpec{
28 //
30 "producer", //
31 Inputs{}, //
32 {OutputSpec{"TES", "STATEFUL", 0, Lifetime::Timeframe}}, //
33 // The producer is stateful, we use a static for the state in this
34 // particular case, but a Singleton or a captured new object would
35 // work as well.
38 [](CallbackService& callbacks) {
39 static int foo = 0;
40 static int step = 0; // incremented in registered callbacks
41 auto startcb = []() {
42 ++step;
43 LOG(info) << "start " << step;
44 };
45 auto stopcb = []() {
46 ++step;
47 LOG(info) << "stop " << step;
48 };
49 auto resetcb = []() {
50 ++step;
51 LOG(info) << "reset " << step;
52 };
53 callbacks.set<CallbackService::Id::Start>(startcb);
54 callbacks.set<CallbackService::Id::Stop>(stopcb);
55 callbacks.set<CallbackService::Id::Reset>(resetcb);
56 return adaptStateless([](DataAllocator& outputs, ControlService& control) {
57 auto& out = outputs.newChunk({"TES", "STATEFUL", 0}, sizeof(int));
58 auto outI = reinterpret_cast<int*>(out.data());
59 LOG(info) << "foo " << foo;
60 outI[0] = foo++;
61 control.endOfStream();
62 control.readyToQuit(QuitRequest::Me);
63 });
64 }) //
65 } //
66 }, //
68 "consumer", //
69 {InputSpec{"test", "TES", "STATEFUL", 0, Lifetime::Timeframe}}, //
70 Outputs{}, //
73 []() {
74 static int expected = 0;
75 return adaptStateless([](InputRecord& inputs, ControlService& control) {
76 const int* in = reinterpret_cast<const int*>(inputs.get("test").payload);
77
78 if (*in != expected++) {
79 LOG(error) << "Expecting " << expected << " found " << *in;
80 } else {
81 LOG(info) << "Everything OK for " << (expected - 1);
82 }
83 });
84 }) //
85 } //
86 } //
87 };
88}
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
DataChunk & newChunk(const Output &, size_t)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
the main header struct
Definition DataHeader.h:618
std::map< std::string, ID > expected
HistogramRegistry foo()
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"