Project
Loading...
Searching...
No Matches
o2SimpleFairMQSink.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16
17#include <chrono>
18#include <thread>
19#include <vector>
20#include <fairmq/Device.h>
22
23using namespace o2::framework;
24
25AlgorithmSpec simplePipe(std::string const& what, int minDelay)
26{
27 return AlgorithmSpec{adaptStateful([what, minDelay]() {
28 srand(getpid());
29 return adaptStateless([what, minDelay](RawDeviceService& device) {
30 std::unique_ptr<fair::mq::Message> msg;
31 device.device()->Receive(msg, "upstream", 0);
32 LOGP(info, "Callback invoked. Size of the message {}", msg->GetSize());
33
34 device.device()->WaitFor(std::chrono::seconds(minDelay));
35 });
36 })};
37}
38
39// This is how you can define your processing in a declarative way
41{
42 return WorkflowSpec{
43 {.name = "B",
44 .inputs = {{{"external"}, "TST", "EXT", 0, Lifetime::OutOfBand, channelParamSpec("upstream")}},
45 .algorithm = simplePipe("b1", 0)},
46 };
47}
virtual fair::mq::Device * device()=0
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > channelParamSpec(std::string const &name)
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
AlgorithmSpec simplePipe(std::string const &what, int minDelay)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
uint64_t const void const *restrict const msg
Definition x9.h:153