Project
Loading...
Searching...
No Matches
o2DiamondWorkflowLeaky.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
20#include <fairmq/Device.h>
21
22#include <iostream>
23#include <chrono>
24#include <thread>
25#include <vector>
26
27using namespace o2::framework;
28
29struct WorkflowOptions {
30 Configurable<int> anInt{"anInt", 1, ""};
31 Configurable<float> aFloat{"aFloat", 2.0f, {"a float option"}};
32 Configurable<double> aDouble{"aDouble", 3., {"a double option"}};
33 Configurable<std::string> aString{"aString", "foobar", {"a string option"}};
34 Configurable<bool> aBool{"aBool", true, {"a boolean option"}};
35};
36
37void customize(std::vector<CallbacksPolicy>& policies)
38{
39 policies.push_back(CallbacksPolicy{
41 .policy = [](CallbackService& service, InitContext&) {
42 service.set<CallbackService::Id::Start>([]() { LOG(info) << "invoked at start"; });
43 }});
44}
45
46void customize(std::vector<SendingPolicy>& policies)
47{
48 policies.push_back(SendingPolicy{
50 .send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
51 auto& proxy = registry.get<FairMQDeviceProxy>();
52 LOG(info) << "A custom policy for sending invoked!";
53 auto* channel = proxy.getOutputChannel(channelIndex);
54 channel->Send(parts, 0);
55 }});
56}
57
59
60AlgorithmSpec simplePipe(std::string const& what, int minDelay)
61{
62 return AlgorithmSpec{adaptStateful([what, minDelay](RunningWorkflowInfo const& runningWorkflow) {
63 srand(getpid());
64 LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow";
65 return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) {
66 device.device()->WaitFor(std::chrono::milliseconds(minDelay));
67 auto& bData = outputs.make<int>(OutputRef{what}, 1);
68 });
69 })};
70}
71
72// This is how you can define your processing in a declarative way
74{
75 return WorkflowSpec{
76 {"A",
77 Inputs{},
78 {OutputSpec{{"a1"}, "TST", "A1"},
79 OutputSpec{{"a2"}, "TST", "A2"}},
81 [](DataAllocator& outputs, RawDeviceService& device) {
82 auto r = rand() % 2;
83 device.device()->WaitFor(std::chrono::seconds(r));
84 if (r == 0) {
85 outputs.make<int>(OutputRef{"a1"}, 1);
86 } else {
87 outputs.make<int>(OutputRef{"a2"}, 1);
88 }
89 })},
90 {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}},
91 {"B",
92 {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
93 {OutputSpec{{"b1"}, "TST", "B1"}},
94 simplePipe("b1", 5000)},
95 {"C",
96 Inputs{InputSpec{"x", "TST", "A2"}},
97 Outputs{OutputSpec{{"c1"}, "TST", "C1"}},
98 simplePipe("c1", 5000)},
99 {"D",
100 Inputs{
101 InputSpec{"a", "TST", "A1"},
102 InputSpec{"b", "TST", "B1"},
103 InputSpec{"c", "TST", "C1"},
104 },
105 Outputs{},
107 auto ref = inputs.get("b");
108 auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
109 LOG(debug) << "Start time: " << header->startTime;
110 })}}};
111}
std::ostringstream debug
decltype(auto) make(const Output &spec, Args... args)
fair::mq::Channel * getOutputChannel(ChannelIndex channelIndex) const
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
virtual fair::mq::Device * device()=0
GLboolean r
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< CallbacksPolicy > &policies)
AlgorithmSpec simplePipe(std::string const &what, int minDelay)
Configurable< int > anInt
Configurable< std::string > aString
Configurable< bool > aBool
Configurable< double > aDouble
Configurable< float > aFloat
static DeviceMatcher matchByName(const char *name)
static EdgeMatcher matchSourceByName(const char *name)
Information about the running workflow.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"