Project
Loading...
Searching...
No Matches
o2DiamondWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
21#include <fairmq/Device.h>
22
23#include <iostream>
24#include <chrono>
25#include <thread>
26#include <vector>
27
28using namespace o2::framework;
29
30struct WorkflowOptions {
31 Configurable<int> anInt{"anInt", 1, ""};
32 Configurable<float> aFloat{"aFloat", 2.0f, {"a float option"}};
33 Configurable<double> aDouble{"aDouble", 3., {"a double option"}};
34 Configurable<std::string> aString{"aString", "foobar", {"a string option"}};
35 Configurable<bool> aBool{"aBool", true, {"a boolean option"}};
36};
37
38void customize(std::vector<CallbacksPolicy>& policies)
39{
40 policies.push_back(CallbacksPolicy{
42 .policy = [](CallbackService& service, InitContext&) {
43 service.set<CallbackService::Id::Start>([]() { LOG(info) << "invoked at start"; });
44 }});
45}
46
47// void customize(std::vector<SendingPolicy>& policies)
48//{
49// policies.push_back(SendingPolicy{
50// .matcher = DeviceMatchers::matchByName("A"),
51// .send = [](FairMQDeviceProxy& proxy, fair::mq::Parts& parts, ChannelIndex channelIndex) {
52// LOG(info) << "A custom policy for sending invoked!";
53// auto* channel = proxy.getOutputChannel(channelIndex);
54// channel->Send(parts, 0);
55// }});
56// }
57
59
60AlgorithmSpec simplePipe(std::string const& what, int minDelay)
61{
62 return AlgorithmSpec{adaptStateful([what, minDelay](RunningWorkflowInfo const& runningWorkflow) {
63 srand(getpid());
64 LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow";
65 return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) {
66 LOGP(info, "Invoked {}", what);
67 device.device()->WaitFor(std::chrono::milliseconds(minDelay));
68 auto& bData = outputs.make<int>(OutputRef{what}, 1);
69 });
70 })};
71}
72
73// This is how you can define your processing in a declarative way
75{
77 .name = "A",
78 .outputs = {OutputSpec{{"a1"}, "TST", "A1"},
79 OutputSpec{{"a2"}, "TST", "A2"}},
80 .algorithm = AlgorithmSpec{adaptStateless(
81 [](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
82 // static RateLimiter limiter;
83 // limiter.check(pcx, std::stoi(device.device()->fConfig->GetValue<std::string>("timeframes-rate-limit")), 2000);
84 auto& aData = outputs.make<int>(OutputRef{"a1"}, 1);
85 auto& bData = outputs.make<int>(OutputRef{"a2"}, 1);
86 })},
87 .options = {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}},
88 }};
90 .name = "B",
91 .inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
92 .outputs = {OutputSpec{{"b1"}, "TST", "B1"}},
93 .algorithm = simplePipe("b1", 1000)};
95 .inputs = {InputSpec{"x", "TST", "A2"}},
96 .outputs = {OutputSpec{{"c1"}, "TST", "C1"}},
97 .algorithm = simplePipe("c1", 2000)};
98 DataProcessorSpec d{.name = "D",
99 .inputs = {InputSpec{"a", "TST", "A1"},
100 InputSpec{"b", "TST", "B1"},
101 InputSpec{"c", "TST", "C1"}},
102 .algorithm = AlgorithmSpec{adaptStateless(
103 [](InputRecord& inputs) {
104 auto ref = inputs.get("b");
105 auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
106 LOG(info) << "Start time: " << header->startTime;
107 })},
108 .labels = {{"expendable"}}};
109
111 WorkflowSpec{b, c},
112 WorkflowSpec{d});
113}
uint32_t c
Definition RawData.h:2
decltype(auto) make(const Output &spec, Args... args)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
virtual fair::mq::Device * device()=0
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
WorkflowSpec concat(T &&t, ARGS &&... args)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< CallbacksPolicy > &policies)
AlgorithmSpec simplePipe(std::string const &what, int minDelay)
Configurable< int > anInt
Configurable< std::string > aString
Configurable< bool > aBool
Configurable< double > aDouble
Configurable< float > aFloat
static DeviceMatcher matchByName(const char *name)
Information about the running workflow.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"