Project
Loading...
Searching...
No Matches
dataSamplingTimePipeline.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#include <thread>
16
17using namespace o2::framework;
18using namespace o2::utilities;
19
20void customize(std::vector<CompletionPolicy>& policies)
21{
23}
24void customize(std::vector<ChannelConfigurationPolicy>& policies)
25{
27}
28
29#include "Framework/InputSpec.h"
34
35#include <boost/algorithm/string.hpp>
36
37#include <chrono>
38#include <iostream>
39
40using namespace o2::framework;
41
42struct FakeCluster {
43 float x;
44 float y;
45 float z;
46 float q;
47};
49
50size_t parallelSize = 4;
51size_t collectionChunkSize = 1000;
55
57{
58 DataProcessorSpec dataProducer{
59 "dataProducer",
60 Inputs{},
61 {
62 OutputSpec{"TPC", "CLUSTERS"},
63 },
66
67 auto processingStage = timePipeline(
69 "processingStage",
70 Inputs{
71 {"dataTPC", "TPC", "CLUSTERS"}},
72 Outputs{
73 {"TPC", "CLUSTERS_P"}},
77
79 "sink",
80 Inputs{
81 {"dataTPC-proc", "TPC", "CLUSTERS_P", 0}},
82 Outputs{},
85
86 // clang-format off
87 DataProcessorSpec simpleQcTask{
88 "simpleQcTask",
89 Inputs{
90 { "TPC_CLUSTERS_S", { "DS", "simpleQcTask0" } },
91 { "TPC_CLUSTERS_P_S", { "DS", "simpleQcTask1" } }
92 },
93 Outputs{},
96 auto inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("TPC_CLUSTERS_S").payload);
97 auto inputDataTpcProcessed = reinterpret_cast<const FakeCluster*>(ctx.inputs().get(
98 "TPC_CLUSTERS_P_S").payload);
99
100 auto ref = ctx.inputs().get("TPC_CLUSTERS_S");
101 const auto* header = DataRefUtils::getHeader<DataHeader*>(ref);
102
103 bool dataGood = true;
104 auto payloadSize = DataRefUtils::getPayloadSize(ref);
105 for (int j = 0; j < payloadSize / sizeof(FakeCluster); ++j) {
106 float diff = std::abs(-inputDataTpc[j].x - inputDataTpcProcessed[j].x) +
107 std::abs(2 * inputDataTpc[j].y - inputDataTpcProcessed[j].y) +
108 std::abs(inputDataTpc[j].z * inputDataTpc[j].q - inputDataTpcProcessed[j].z) +
109 std::abs(inputDataTpc[j].q - inputDataTpcProcessed[j].q);
110 if (diff > 1) {
111 dataGood = false;
112 break;
113 }
114 }
115 LOG(info) << "qcTaskTPC - received data is " << (dataGood ? "correct" : "wrong");
116 }
117 }
118 };
119
120 DataProcessorSpec dummyProducer{
121 "dummy",
122 Inputs{},
123 Outputs{
124 { {"tsthistos"}, "TST", "HISTOS", 0 },
125 { {"tststring"}, "TST", "STRING", 0 }
126 },
128 };
129
130 WorkflowSpec specs = {
131 dataProducer,
132 processingStage,
133 sink,
134 simpleQcTask,
135 dummyProducer
136 };
137 const char* o2Root = getenv("O2_ROOT");
138 if (o2Root == nullptr) {
139 throw std::runtime_error("The O2_ROOT environment variable is not set, probably the O2 environment has not been loaded.");
140 }
141 std::string configurationSource = std::string("json:/") + o2Root + "/share/etc/exampleDataSamplingConfig.json";
142 DataSampling::GenerateInfrastructure(specs, configurationSource);
143
144 return specs;
145}
146// clang-format on
147
149{
150 size_t index = ctx.services().get<ParallelContext>().index1D();
151 std::this_thread::sleep_for(std::chrono::seconds(1));
152 // Creates a new message of size collectionChunkSize which
153 // has "TPC" as data origin and "CLUSTERS" as data description.
154 auto& tpcClusters = ctx.outputs().make<FakeCluster>(
156 int i = 0;
157
158 for (auto& cluster : tpcClusters) {
159 assert(i < collectionChunkSize);
160 cluster.x = index;
161 cluster.y = i;
162 cluster.z = i;
163 cluster.q = rand() % 1000;
164 i++;
165 }
166}
167
169{
170 size_t index = ctx.services().get<ParallelContext>().index1D();
171
172 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
173
174 auto& processedTpcClusters = ctx.outputs().make<FakeCluster>(
175 Output{"TPC", "CLUSTERS_P", static_cast<o2::header::DataHeader::SubSpecificationType>(index)},
177
178 int i = 0;
179 for (auto& cluster : processedTpcClusters) {
180 assert(i < collectionChunkSize);
181 cluster.x = -inputDataTpc[i].x;
182 cluster.y = 2 * inputDataTpc[i].y;
183 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
184 cluster.q = inputDataTpc[i].q;
185 i++;
186 }
187};
188
190{
191 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-proc").payload);
192}
Definition of O2 Data Sampling, v1.0.
int32_t i
uint32_t j
Definition RawData.h:0
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
void customize(std::vector< CompletionPolicy > &policies)
size_t parallelSize
void someDataProducerAlgorithm(ProcessingContext &ctx)
void someSinkAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLdouble GLdouble GLdouble z
Definition glcorearb.h:843
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::function< void(ProcessingContext &)> ProcessCallback
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"