Project
Loading...
Searching...
No Matches
test_TimePipeline.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15#include <thread>
16#include <chrono>
17
18using namespace o2::framework;
19
20struct FakeCluster {
21 float x;
22 float y;
23 float z;
24 float q;
25};
27
28size_t parallelSize = 4;
29size_t collectionChunkSize = 1000;
32
34{
35 return WorkflowSpec{
36 {"dataProducer",
37 Inputs{},
38 {
39 OutputSpec{"TPC", "CLUSTERS"},
40 },
45 "processingStage",
46 Inputs{
47 {"dataTPC", "TPC", "CLUSTERS"}},
48 Outputs{
49 {"TPC", "CLUSTERS_P"}},
54 "dataSampler",
55 Inputs{
56 {"dataTPC-sampled", "TPC", "CLUSTERS", 0, Lifetime::Timeframe},
57 },
58 Outputs{},
61}
62
64{
65 uint32_t index = ctx.services().get<ParallelContext>().index1D();
66 std::this_thread::sleep_for(std::chrono::seconds(1));
67 // Creates a new message of size collectionChunkSize which
68 // has "TPC" as data origin and "CLUSTERS" as data description.
69 auto& tpcClusters = ctx.outputs().make<FakeCluster>(Output{"TPC", "CLUSTERS", index}, collectionChunkSize);
70 int i = 0;
71
72 for (auto& cluster : tpcClusters) {
73 assert(i < collectionChunkSize);
74 cluster.x = index;
75 cluster.y = i;
76 cluster.z = i;
77 cluster.q = rand() % 1000;
78 i++;
79 }
80}
81
83{
84 uint32_t index = ctx.services().get<ParallelContext>().index1D();
85
86 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
87
88 auto processedTpcClusters =
89 ctx.outputs().make<FakeCluster>(Output{"TPC", "CLUSTERS_P", index}, collectionChunkSize);
90
91 int i = 0;
92 for (auto& cluster : processedTpcClusters) {
93 assert(i < collectionChunkSize);
94 cluster.x = -inputDataTpc[i].x;
95 cluster.y = 2 * inputDataTpc[i].y;
96 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
97 cluster.q = inputDataTpc[i].q;
98 i++;
99 }
100};
int32_t i
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint index
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::function< void(ProcessingContext &)> ProcessCallback
the main header struct
Definition DataHeader.h:618
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
size_t parallelSize
void someDataProducerAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.