Project
Loading...
Searching...
No Matches
o2DummyPopulatorWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
21#include <fairmq/Device.h>
22
23#include <iostream>
24#include <chrono>
25#include <thread>
26#include <vector>
27
28using namespace o2::framework;
29
30struct WorkflowOptions {
31 Configurable<int> anInt{"anInt", 1, ""};
32 Configurable<float> aFloat{"aFloat", 2.0f, {"a float option"}};
33 Configurable<double> aDouble{"aDouble", 3., {"a double option"}};
34 Configurable<std::string> aString{"aString", "foobar", {"a string option"}};
35 Configurable<bool> aBool{"aBool", true, {"a boolean option"}};
36};
37
38void customize(std::vector<CompletionPolicy>& policies)
39{
41 a.order = CompletionPolicy::CompletionOrder::Timeslice;
42 policies.clear();
43 policies.push_back(a);
44}
45
47
48// This is how you can define your processing in a declarative way
50{
52 .name = "A",
53 .algorithm = AlgorithmSpec{adaptStateless(
54 [](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
55 for (unsigned int i = 0; i < 10; i++) {
56 outputs.snapshot(Output{"TS1", "A1", i}, i);
57 outputs.snapshot(Output{"TS2", "A2", i}, i);
58 }
59 })},
60 .options = {
61 ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}},
62 }};
63
64 a.outputs.emplace_back(ConcreteDataTypeMatcher{"TS1", "A1"}, Lifetime::Sporadic);
65 a.outputs.emplace_back(ConcreteDataTypeMatcher{"TS2", "A2"}, Lifetime::Sporadic);
66
68 .name = "D",
69 .inputs = {InputSpec{"a", "TS1", Lifetime::Sporadic}, InputSpec{"b", "TS2", Lifetime::Sporadic}},
70 .algorithm = AlgorithmSpec{adaptStateless(
71 [](InputRecord& inputs) {
72 auto ref = inputs.get("b");
73 if (!ref.header) {
74 LOG(info) << "Header is not there";
75 return;
76 }
77 auto dph = o2::header::get<const DataProcessingHeader*>(ref.header);
78 auto dh = o2::header::get<const o2::header::DataHeader*>(ref.header);
79 LOG(info) << "Start time: " << dph->startTime;
80 LOG(info) << "Subspec: " << dh->subSpecification;
81 })},
82 };
83
85}
int32_t i
void snapshot(const Output &spec, T const &object)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
WorkflowSpec concat(T &&t, ARGS &&... args)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< CompletionPolicy > &policies)
Configurable< int > anInt
Configurable< std::string > aString
Configurable< bool > aBool
Configurable< double > aDouble
Configurable< float > aFloat
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"