Project
Loading...
Searching...
No Matches
test_TimePipeline.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "Framework/InputSpec.h"
15
16#include <boost/algorithm/string.hpp>
17
18#include <chrono>
19
20using namespace o2::framework;
21
22struct FakeCluster {
23 float x;
24 float y;
25 float z;
26 float q;
27};
29
30size_t parallelSize = 4;
31size_t collectionChunkSize = 1000;
34
36{
37 return WorkflowSpec{
38 {"dataProducer",
39 Inputs{},
40 {
41 OutputSpec{"TPC", "CLUSTERS"},
42 },
47 "processingStage",
48 Inputs{
49 {"dataTPC", "TPC", "CLUSTERS"}},
50 Outputs{
51 {"TPC", "CLUSTERS_P"}},
56 "dataSampler",
57 Inputs{
58 {"dataTPC-sampled", "TPC", "CLUSTERS", 0, Lifetime::Timeframe},
59 },
60 Outputs{},
63}
64
66{
67 uint32_t index = ctx.services().get<ParallelContext>().index1D();
68 std::this_thread::sleep_for(std::chrono::seconds(1));
69 // Creates a new message of size collectionChunkSize which
70 // has "TPC" as data origin and "CLUSTERS" as data description.
71 auto& tpcClusters = ctx.outputs().make<FakeCluster>(Output{"TPC", "CLUSTERS", index}, collectionChunkSize);
72 int i = 0;
73
74 for (auto& cluster : tpcClusters) {
75 assert(i < collectionChunkSize);
76 cluster.x = index;
77 cluster.y = i;
78 cluster.z = i;
79 cluster.q = rand() % 1000;
80 i++;
81 }
82}
83
85{
86 uint32_t index = ctx.services().get<ParallelContext>().index1D();
87
88 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
89
90 auto processedTpcClusters =
91 ctx.outputs().make<FakeCluster>(Output{"TPC", "CLUSTERS_P", index}, collectionChunkSize);
92
93 int i = 0;
94 for (auto& cluster : processedTpcClusters) {
95 assert(i < collectionChunkSize);
96 cluster.x = -inputDataTpc[i].x;
97 cluster.y = 2 * inputDataTpc[i].y;
98 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
99 cluster.q = inputDataTpc[i].q;
100 i++;
101 }
102};
int32_t i
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint index
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::function< void(ProcessingContext &)> ProcessCallback
the main header struct
Definition DataHeader.h:618
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
size_t parallelSize
void someDataProducerAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.