Project
Loading...
Searching...
No Matches
test_ConsumeWhenAllOrdered.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
21#include <fairmq/Device.h>
22
23void customize(std::vector<o2::framework::CompletionPolicy>& policies)
24{
25 policies.push_back(o2::framework::CompletionPolicyHelpers::consumeWhenAllOrdered("fake-output-proxy"));
26}
27
28#include <iostream>
29#include <vector>
30
31using namespace o2::framework;
32
34
35// This is how you can define your processing in a declarative way
37{
38 DataProcessorSpec producer{
39 .name = "producer",
40 .outputs = {OutputSpec{{"counter"}, "TST", "A1"}},
41 .algorithm = AlgorithmSpec{adaptStateless(
42 [](DataAllocator& outputs, ProcessingContext& pcx) {
43 static int counter = 0;
44 auto& aData = outputs.make<int>(OutputRef{"counter"});
45 aData = counter++;
46 if (counter == 100) {
47 pcx.services().get<ControlService>().endOfStream();
48 }
49 })},
50 };
51
52 DataProcessorSpec producerSkipping{
53 .name = "producerSkipping",
54 .outputs = {OutputSpec{{"counter"}, "TST", "A2"}},
55 .algorithm = AlgorithmSpec{adaptStateless(
56 [](DataAllocator& outputs, ProcessingContext& pcx) {
57 static int counter = -1;
58 counter++;
59 if (((counter % 10) == 4) || ((counter % 10) == 5)) {
60 return;
61 }
62 auto& aData = outputs.make<int>(OutputRef{"counter"});
63 aData = counter;
64 if (counter == 100) {
65 pcx.services().get<ControlService>().endOfStream();
66 }
67 })},
68 };
69
70 DataProcessorSpec outputProxy{
71 .name = "fake-output-proxy",
72 .inputs = {
73 InputSpec{"x", "TST", "A1", Lifetime::Timeframe},
74 InputSpec{"y", "TST", "A2", Lifetime::Timeframe}},
75 .algorithm = adaptStateful([](CallbackService& callbacks) {
76 static int count = 0;
77 auto eosCallback = [](EndOfStreamContext &ctx) {
78 if (count != 80) {
79 LOGP(fatal, "Wrong number of timeframes seen: {} != 80", count);
80 }
81 };
82 callbacks.set<CallbackService::Id::EndOfStream>(eosCallback);
83 return adaptStateless([](Input<"x", int> const& x)
84 {
85 std::cout << "See: " << count++ << " with contents " << (int)x << std::endl;
86 }); })};
87
88 return WorkflowSpec{producer, producerSkipping, outputProxy};
89}
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
The services registry associated with this processing context.
GLint GLenum GLint x
Definition glcorearb.h:403
GLint GLsizei count
Definition glcorearb.h:399
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)