Project
Loading...
Searching...
No Matches
o2SimpleFairMQSource.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16#include <fairmq/Device.h>
17
18#include <chrono>
19#include <thread>
20#include <vector>
21
23
24using namespace o2::framework;
25
26void sendEndOfStream(fair::mq::Device& device, std::string channel)
27{
28 fair::mq::Parts parts;
29 fair::mq::MessagePtr payload(device.NewMessage());
31 sih.state = InputChannelState::Completed;
33 dh.dataOrigin = "TST";
34 dh.dataDescription = "A";
35 dh.subSpecification = 0;
36 dh.payloadSize = 1000;
37
38 DataProcessingHeader dph{1, 0};
39 auto channelAlloc = o2::pmr::getTransportAllocator(device.GetChannel(channel, 0).Transport());
40 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih});
41 // sigh... See if we can avoid having it const by not
42 // exposing it to the user in the first place.
43 parts.AddPart(std::move(header));
44 parts.AddPart(std::move(payload));
45 device.Send(parts, channel, 0);
46}
47
48// This is how you can define your processing in a declarative way
50{
51 return WorkflowSpec{
52 {.name = "A",
53 .outputs = {OutputSpec{"TST", "A", 0, Lifetime::OutOfBand}},
54 .algorithm = AlgorithmSpec{adaptStateless(
55 [](RawDeviceService& service) {
56 for (auto& channel : service.device()->GetChannels()) {
57 LOG(info) << channel.first;
58 }
59 std::this_thread::sleep_for(std::chrono::seconds(rand() % 2));
60 auto msg = service.device()->NewMessageFor("downstream", 0, 1000);
61
63 dh.dataOrigin = "TST";
64 dh.dataDescription = "A";
65 dh.subSpecification = 0;
66 dh.payloadSize = 1000;
67
68 DataProcessingHeader dph{1, 0};
69 // we have to move the incoming data
70 o2::header::Stack headerStack{dh, dph};
71
72 auto channelAlloc = o2::pmr::getTransportAllocator(service.device()->GetChannels()["downstream"][0].Transport());
73 fair::mq::MessagePtr headerMessage = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
74
75 fair::mq::Parts out;
76 out.AddPart(std::move(headerMessage));
77 out.AddPart(std::move(msg));
78 o2::header::hexDump("header", out.At(0)->GetData(), out.At(0)->GetSize(), 100);
79
80 service.device()->Send(out, "downstream", 0);
81 sendEndOfStream(*service.device(), "downstream");
82 exit(1);
83 })}}};
84}
virtual fair::mq::Device * device()=0
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
void hexDump(const char *desc, const void *voidaddr, size_t len, size_t max=0)
helper function to print a hex/ASCII dump of some memory
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
void sendEndOfStream(fair::mq::Device &device, std::string channel)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:618
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153