Project
Loading...
Searching...
No Matches
test_ConsumeWhenAllOrdered.cxx
Go to the documentation of this file.
1
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3
// All rights not expressly granted are reserved.
4
//
5
// This software is distributed under the terms of the GNU General Public
6
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7
//
8
// In applying this license CERN does not waive the privileges and immunities
9
// granted to it by virtue of its status as an Intergovernmental Organization
10
// or submit itself to any jurisdiction.
11
#include "
Framework/ConfigParamSpec.h
"
12
#include "
Framework/DataTakingContext.h
"
13
#include "
Framework/CompletionPolicyHelpers.h
"
14
#include "
Framework/DeviceSpec.h
"
15
#include "
Framework/ControlService.h
"
16
#include "
Framework/Configurable.h
"
17
#include "
Framework/RunningWorkflowInfo.h
"
18
#include "
Framework/CallbackService.h
"
19
#include "
Framework/EndOfStreamContext.h
"
20
#include "
Framework/CompletionPolicyHelpers.h
"
21
#include <fairmq/Device.h>
22
23
void
customize
(std::vector<o2::framework::CompletionPolicy>& policies)
24
{
25
policies.push_back(
o2::framework::CompletionPolicyHelpers::consumeWhenAllOrdered
(
"fake-output-proxy"
));
26
}
27
28
#include <iostream>
29
#include <vector>
30
31
using namespace
o2::framework
;
32
33
#include "
Framework/runDataProcessing.h
"
34
35
// This is how you can define your processing in a declarative way
36
WorkflowSpec
defineDataProcessing
(
ConfigContext
const
& specs)
37
{
38
DataProcessorSpec
producer{
39
.
name
=
"producer"
,
40
.outputs = {
OutputSpec
{{
"counter"
},
"TST"
,
"A1"
}},
41
.algorithm =
AlgorithmSpec
{
adaptStateless
(
42
[](
DataAllocator
& outputs,
ProcessingContext
& pcx) {
43
static
int
counter
= 0;
44
auto
& aData = outputs.
make
<
int
>(
OutputRef
{
"counter"
});
45
aData =
counter
++;
46
if
(
counter
== 100) {
47
pcx.
services
().
get
<
ControlService
>().endOfStream();
48
}
49
})},
50
};
51
52
DataProcessorSpec
producerSkipping{
53
.
name
=
"producerSkipping"
,
54
.outputs = {
OutputSpec
{{
"counter"
},
"TST"
,
"A2"
}},
55
.algorithm =
AlgorithmSpec
{
adaptStateless
(
56
[](
DataAllocator
& outputs,
ProcessingContext
& pcx) {
57
static
int
counter
= -1;
58
counter
++;
59
if
(((
counter
% 10) == 4) || ((
counter
% 10) == 5)) {
60
return
;
61
}
62
auto
& aData = outputs.
make
<
int
>(
OutputRef
{
"counter"
});
63
aData =
counter
;
64
if
(
counter
== 100) {
65
pcx.
services
().
get
<
ControlService
>().endOfStream();
66
}
67
})},
68
};
69
70
DataProcessorSpec
outputProxy{
71
.
name
=
"fake-output-proxy"
,
72
.inputs = {
73
InputSpec
{
"x"
,
"TST"
,
"A1"
, Lifetime::Timeframe},
74
InputSpec
{
"y"
,
"TST"
,
"A2"
, Lifetime::Timeframe}},
75
.algorithm =
adaptStateful
([](
CallbackService
& callbacks) {
76
static
int
count
= 0;
77
auto
eosCallback = [](
EndOfStreamContext
&ctx) {
78
if
(
count
!= 80) {
79
LOGP(fatal,
"Wrong number of timeframes seen: {} != 80"
,
count
);
80
}
81
};
82
callbacks.
set
<CallbackService::Id::EndOfStream>(eosCallback);
83
return
adaptStateless
([](
Input<"x", int>
const
&
x
)
84
{
85
std::cout <<
"See: "
<<
count
++ <<
" with contents "
<< (
int
)
x
<< std::endl;
86
}); })};
87
88
return
WorkflowSpec
{producer, producerSkipping, outputProxy};
89
}
CallbackService.h
CompletionPolicyHelpers.h
ConfigParamSpec.h
Configurable.h
ControlService.h
DataTakingContext.h
DeviceSpec.h
EndOfStreamContext.h
RunningWorkflowInfo.h
int
o2::framework::CallbackService
Definition
CallbackService.h:35
o2::framework::CallbackService::set
void set(U &&cb)
Definition
CallbackService.h:127
o2::framework::ConfigContext
Definition
ConfigContext.h:24
o2::framework::ControlService
Definition
ControlService.h:40
o2::framework::DataAllocator
Definition
DataAllocator.h:135
o2::framework::DataAllocator::make
decltype(auto) make(const Output &spec, Args... args)
Definition
DataAllocator.h:166
o2::framework::EndOfStreamContext
Definition
EndOfStreamContext.h:22
o2::framework::ProcessingContext
Definition
ProcessingContext.h:27
o2::framework::ProcessingContext::services
ServiceRegistryRef services()
The services registry associated with this processing context.
Definition
ProcessingContext.h:39
o2::framework::ServiceRegistryRef::get
T & get() const
Definition
ServiceRegistryRef.h:85
x
GLint GLenum GLint x
Definition
glcorearb.h:403
count
GLint GLsizei count
Definition
glcorearb.h:399
counter
GLuint counter
Definition
glcorearb.h:3987
o2::framework
Defining PrimaryVertex explicitly as messageable.
Definition
TFIDInfo.h:20
o2::framework::WorkflowSpec
std::vector< DataProcessorSpec > WorkflowSpec
Definition
HBFUtilsInitializer.h:39
o2::framework::adaptStateless
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
Definition
AlgorithmSpec.h:229
o2::framework::adaptStateful
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
Definition
AlgorithmSpec.h:236
runDataProcessing.h
o2::framework::AlgorithmSpec
Definition
AlgorithmSpec.h:43
o2::framework::CompletionPolicyHelpers::consumeWhenAllOrdered
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
Definition
CompletionPolicyHelpers.cxx:174
o2::framework::DataProcessorSpec
Definition
DataProcessorSpec.h:41
o2::framework::DataProcessorSpec::name
std::string name
Definition
DataProcessorSpec.h:42
o2::framework::InputSpec
Definition
InputSpec.h:31
o2::framework::Input
Definition
AlgorithmSpec.h:88
o2::framework::OutputRef
Definition
OutputRef.h:29
o2::framework::OutputSpec
Definition
OutputSpec.h:33
defineDataProcessing
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
Definition
test_ConsumeWhenAllOrdered.cxx:36
customize
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
Definition
test_ConsumeWhenAllOrdered.cxx:23
Framework
Core
test
test_ConsumeWhenAllOrdered.cxx
Generated on Tue Feb 25 2025 23:16:40 for Project by
1.9.8