Project
Loading...
Searching...
No Matches
DPLBroadcasterMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <fstream>
16#include "DPLUtils/Utils.h"
20#include "random"
21#include "Framework/Logger.h"
22#include <thread>
23
24namespace o2f = o2::framework;
25
26namespace o2::workflows
27{
28
31
33{
34 return {"Generator", // Device name
35 noInputs, // No inputs for a generator
36 o2f::Outputs{usrOutput}, // One simple output
37
39 int msgCounter = 0;
40 auto msgCounter_shptr = std::make_shared<int>(msgCounter);
41 auto usrOutput_shptr = std::make_shared<o2f::Output>(getOutput(usrOutput));
42
43 LOG(info) << ">>>>>>>>>>>>>> Generator initialised";
44
45 // Processing context in captured from return on InitCallback
46 return [usrOutput_shptr, msgCounter_shptr](o2f::ProcessingContext& ctx) {
47 int msgIndex = (*msgCounter_shptr)++;
48 if (msgIndex > 10) {
49 ctx.services().get<framework::ControlService>().endOfStream();
50 }
51 LOG(info) << ">>> MSG:" << msgIndex;
52 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
53
54 LOG(info) << ">>> Preparing MSG:" << msgIndex;
55
56 auto& outputMsg =
57 ctx.outputs().newChunk(*usrOutput_shptr, (msgIndex + 1) * sizeof(uint32_t) / sizeof(char));
58
59 LOG(info) << ">>> Preparing1 MSG:" << msgIndex;
60
61 auto payload = reinterpret_cast<uint32_t*>(outputMsg.data());
62
63 payload[0] = msgIndex;
64
65 LOG(info) << ">>> Preparing2 MSG:" << msgIndex;
66
67 for (int k = 0; k < msgIndex; ++k) {
68 payload[k + 1] = (uint32_t)32;
69 LOG(info) << ">>>>\t" << payload[k + 1];
70 }
71
72 return;
73 };
74 }}};
75}
76
78{
79 return {devName, // Device name
80 o2f::Inputs{usrInput}, // No inputs, for the moment
81 o2f::Outputs{usrOutput}, o2f::AlgorithmSpec{[usrOutput](o2f::InitContext&) {
82 auto output_sharedptr = std::make_shared<o2f::Output>(getOutput(usrOutput));
83
84 // Processing context in captured from return on InitCallback
85 return [output_sharedptr](o2f::ProcessingContext& ctx) {
86 auto inputMsg = ctx.inputs().getByPos(0);
87 auto msgSize = o2::framework::DataRefUtils::getPayloadSize(inputMsg);
88
89 auto& fwdMsg = ctx.outputs().newChunk((*output_sharedptr), msgSize);
90 std::memcpy(fwdMsg.data(), inputMsg.payload, msgSize);
91 };
92 }}};
93}
94
96{
97 return {"Sink", // Device name
98 o2f::Inputs{usrInput}, // No inputs, for the moment
100
102 // Processing context in captured from return on InitCallback
103 return [](o2f::ProcessingContext& ctx) {
104 LOG(info) << "Received message ";
105
106 auto inputMsg = ctx.inputs().getByPos(0);
107 auto payload = reinterpret_cast<const uint32_t*>(inputMsg.payload);
108
109 LOG(info) << "Received message containing" << payload[0] << "elements";
110
111 for (int j = 0; j < payload[0]; ++j) {
112 LOG(info) << payload[j + 1] << "\t";
113 }
114 LOG(info);
115 };
116 }}};
117}
118
120{
121 auto lspec = o2f::WorkflowSpec();
122
123 // A generator of data
124 lspec.emplace_back(defineGenerator(o2f::OutputSpec{"TST", "ToBC", 0, o2f::Lifetime::Timeframe}));
125
126 // A two-way broadcaster
127 lspec.emplace_back(defineBroadcaster("Broadcaster",
128 o2f::InputSpec{"input", "TST", "ToBC", 0, o2f::Lifetime::Timeframe},
129 o2f::Outputs{{"TST", "BCAST0", 0, o2f::Lifetime::Timeframe},
130 {"TST", "BCAST1", 0, o2f::Lifetime::Timeframe}}));
131
132 // Two pipeline devices
133 lspec.emplace_back(definePipeline("pip0", o2f::InputSpec{"bc", "TST", "BCAST0", 0, o2f::Lifetime::Timeframe},
134 o2f::OutputSpec{"TST", "PIP0", 0, o2f::Lifetime::Timeframe}));
135 lspec.emplace_back(definePipeline("pip1", o2f::InputSpec{"bc", "TST", "BCAST1", 0, o2f::Lifetime::Timeframe},
136 o2f::OutputSpec{"TST", "PIP1", 0, o2f::Lifetime::Timeframe}));
137
138 // A gatherer
139 lspec.emplace_back(defineMerger("Merger", o2f::Inputs{{"input1", "TST", "PIP0", 0, o2f::Lifetime::Timeframe}, {"input2", "TST", "PIP1", 0, o2f::Lifetime::Timeframe}},
140 o2f::OutputSpec{"TST", "ToSink", 0, o2f::Lifetime::Timeframe}));
141
142 // A sink which dumps messages
143 lspec.emplace_back(defineSink(o2f::InputSpec{"input", "TST", "ToSink", 0, o2f::Lifetime::Timeframe}));
144 return std::move(lspec);
145}
146
147} // namespace o2::workflows
A collection of , v0.1.
uint32_t j
Definition RawData.h:0
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
o2f::DataProcessorSpec definePipeline(std::string devName, o2f::InputSpec usrInput, o2f::OutputSpec usrOutput)
o2f::DataProcessorSpec defineGenerator(o2f::OutputSpec usrOutput)
o2f::Output getOutput(const o2f::OutputSpec outputSpec)
Definition Utils.cxx:28
o2f::DataProcessorSpec defineMerger(std::string devName, o2f::Inputs usrInputs, o2f::OutputSpec usrOutput, std::function< void(OutputBuffer, const o2f::DataRef)> const mergerFunc)
o2::framework::WorkflowSpec DPLBroadcasterMergerWorkflow()
o2f::DataProcessorSpec defineBroadcaster(std::string devName, o2f::InputSpec usrInput, o2f::Outputs usrOutputs, std::function< size_t(o2f::DataRef)> const func)
o2f::DataProcessorSpec defineSink(o2f::InputSpec usrInput)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"