Project
Loading...
Searching...
No Matches
dataSamplingPodAndRoot.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include <thread>
15
16using namespace o2::framework;
17using namespace o2::utilities;
18
19void customize(std::vector<CompletionPolicy>& policies)
20{
22}
23void customize(std::vector<ChannelConfigurationPolicy>& policies)
24{
26}
27
28#include "Framework/InputSpec.h"
33#include "Framework/Logger.h"
35#include <TClonesArray.h>
36#include <TObjString.h>
37#include <TH1F.h>
38#include <TString.h>
39
40#include <boost/algorithm/string.hpp>
41
42#include <chrono>
43#include <iostream>
44
45struct FakeCluster {
46 float x;
47 float y;
48 float z;
49 float q;
50};
52
53size_t collectionChunkSize = 1000;
57
59{
60 DataProcessorSpec podDataProducer{
61 "podDataProducer",
62 Inputs{},
63 {OutputSpec{{"TPC", "CLUSTERS"}},
64 OutputSpec{{"ITS", "CLUSTERS"}}},
67
68 DataProcessorSpec processingStage{
69 "processingStage",
70 Inputs{
71 {"dataTPC", {"TPC", "CLUSTERS"}},
72 {"dataITS", {"ITS", "CLUSTERS"}}},
73 Outputs{
74 {{"TPC", "CLUSTERS_P"}},
75 {{"ITS", "CLUSTERS_P"}}},
78
79 DataProcessorSpec podSink{
80 "podSink",
81 Inputs{
82 {"dataTPC-proc", {"TPC", "CLUSTERS_P"}},
83 {"dataITS-proc", {"ITS", "CLUSTERS_P"}}},
84 Outputs{},
87
88 // clang-format off
89 DataProcessorSpec qcTaskTpc{
90 "qcTaskTpc",
91 Inputs{
92 { "TPC_CLUSTERS_S", {"DS", "simpleQcTask0"}},
93 { "TPC_CLUSTERS_P_S", {"DS", "simpleQcTask1"}}
94 },
95 Outputs{},
98 auto inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("TPC_CLUSTERS_S").payload);
99 auto inputDataTpcProcessed = reinterpret_cast<const FakeCluster*>(ctx.inputs().get(
100 "TPC_CLUSTERS_P_S").payload);
101
102 auto ref = ctx.inputs().get("TPC_CLUSTERS_S");
103 const auto* dataHeader = DataRefUtils::getHeader<DataHeader*>(ref);
104
105 bool dataGood = true;
106 for (int j = 0; j < DataRefUtils::getPayloadSize(ref) / sizeof(FakeCluster); ++j) {
107 float diff = std::abs(-inputDataTpc[j].x - inputDataTpcProcessed[j].x) +
108 std::abs(2 * inputDataTpc[j].y - inputDataTpcProcessed[j].y) +
109 std::abs(inputDataTpc[j].z * inputDataTpc[j].q - inputDataTpcProcessed[j].z) +
110 std::abs(inputDataTpc[j].q - inputDataTpcProcessed[j].q);
111 if (diff > 1) {
112 dataGood = false;
113 break;
114 }
115 }
116
117 LOG(info) << "qcTaskTPC - received data is " << (dataGood ? "correct" : "wrong");
118
119 const auto* dsHeader = DataRefUtils::getHeader<DataSamplingHeader*>(ref);
120 if (dsHeader) {
121 LOG(info) << "Matching messages seen by Dispatcher: " << dsHeader->totalEvaluatedMessages
122 << ", accepted: " << dsHeader->totalAcceptedMessages
123 << ", sample time: " << dsHeader->sampleTimeUs
124 << ", device ID: " << dsHeader->deviceID.str;
125 } else {
126 LOG(error) << "DataSamplingHeader missing!";
127 }
128
129 }
130 }
131 };
132
133 DataProcessorSpec rootDataProducer{
134 "rootDataProducer",
135 {},
136 {
137 OutputSpec{ "TST", "HISTOS", 0, Lifetime::Timeframe },
138 OutputSpec{ "TST", "STRING", 0, Lifetime::Timeframe }
139 },
141 [](ProcessingContext& ctx) {
142 std::this_thread::sleep_for(std::chrono::seconds(1));
143 // Create an histogram
144 auto& singleHisto = ctx.outputs().make<TH1F>(Output{ "TST", "HISTOS", 0 }, "h1", "test", 100, -10., 10.);
145 auto& aString = ctx.outputs().make<TObjString>(Output{ "TST", "STRING", 0 }, "foo");
146 singleHisto.FillRandom("gaus", 1000);
147 Double_t stats[4];
148 singleHisto.GetStats(stats);
149 LOG(info) << "sumw" << stats[0] << "\n"
150 << "sumw2" << stats[1] << "\n"
151 << "sumwx" << stats[2] << "\n"
152 << "sumwx2" << stats[3] << "\n";
153 }
154 }
155 };
156
157 DataProcessorSpec rootSink{
158 "rootSink",
159 {
160 InputSpec{ "histos", "TST", "HISTOS", 0, Lifetime::Timeframe },
161 InputSpec{ "string", "TST", "STRING", 0, Lifetime::Timeframe },
162 },
163 {},
165 [](ProcessingContext& ctx) {
166 auto h = ctx.inputs().get<TH1F*>("histos");
167 if (h.get() == nullptr) {
168 throw std::runtime_error("Missing output");
169 }
170 Double_t stats[4];
171 h->GetStats(stats);
172 LOG(info) << "sumw" << stats[0] << "\n"
173 << "sumw2" << stats[1] << "\n"
174 << "sumwx" << stats[2] << "\n"
175 << "sumwx2" << stats[3] << "\n";
176 auto s = ctx.inputs().get<TObjString*>("string");
177
178 LOG(info) << "String is " << s->GetString().Data();
179 } }
180 };
181
182 DataProcessorSpec rootQcTask{
183 "rootQcTask",
184 {
185 InputSpec{ "TST_HISTOS_S", {"DS", "rootQcTask0"}},
186 InputSpec{ "TST_STRING_S", {"DS", "rootQcTask1"}},
187 },
188 Outputs{},
190 [](ProcessingContext& ctx) {
191 auto h = ctx.inputs().get<TH1F*>("TST_HISTOS_S");
192 if (h.get() == nullptr) {
193 throw std::runtime_error("Missing TST_HISTOS_S");
194 }
195 Double_t stats[4];
196 h->GetStats(stats);
197 LOG(info) << "sumw" << stats[0] << "\n"
198 << "sumw2" << stats[1] << "\n"
199 << "sumwx" << stats[2] << "\n"
200 << "sumwx2" << stats[3] << "\n";
201 auto s = ctx.inputs().get<TObjString*>("TST_STRING_S");
202
203 LOG(info) << "qcTaskTst: TObjString is " << (std::string("foo") == s->GetString().Data() ? "correct" : "wrong");
204 }
205 }
206 };
207
208 WorkflowSpec specs{
209 podDataProducer,
210 processingStage,
211 podSink,
212 qcTaskTpc,
213
214 rootDataProducer,
215 rootSink,
216 rootQcTask
217 };
218
219 const char* o2Root = getenv("O2_ROOT");
220 if (o2Root == nullptr) {
221 throw std::runtime_error("The O2_ROOT environment variable is not set, probably the O2 environment has not been loaded.");
222 }
223 std::string configurationSource = std::string("json:/") + o2Root + "/share/etc/exampleDataSamplingConfig.json";
224 LOG(info) << "Using config source: " << configurationSource;
225 DataSampling::GenerateInfrastructure(specs, configurationSource, 1);
226 return specs;
227}
228// clang-format on
229
231{
232 std::this_thread::sleep_for(std::chrono::seconds(1));
233 // Creates a new message of size collectionChunkSize which
234 // has "TPC" as data origin and "CLUSTERS" as data description.
235 auto& tpcClusters = ctx.outputs().make<FakeCluster>(Output{"TPC", "CLUSTERS", 0}, collectionChunkSize);
236 int i = 0;
237
238 for (auto& cluster : tpcClusters) {
239 assert(i < collectionChunkSize);
240 cluster.x = i;
241 cluster.y = i;
242 cluster.z = i;
243 cluster.q = rand() % 1000;
244 i++;
245 }
246
247 auto& itsClusters = ctx.outputs().make<FakeCluster>(Output{"ITS", "CLUSTERS", 0}, collectionChunkSize);
248 i = 0;
249 for (auto& cluster : itsClusters) {
250 assert(i < collectionChunkSize);
251 cluster.x = i;
252 cluster.y = i;
253 cluster.z = i;
254 cluster.q = rand() % 10;
255 i++;
256 }
257}
258
260{
261 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
262 const FakeCluster* inputDataIts = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataITS").payload);
263
264 auto processedTpcClusters =
265 ctx.outputs().make<FakeCluster>(Output{"TPC", "CLUSTERS_P", 0}, collectionChunkSize);
266 auto processedItsClusters =
267 ctx.outputs().make<FakeCluster>(Output{"ITS", "CLUSTERS_P", 0}, collectionChunkSize);
268
269 int i = 0;
270 for (auto& cluster : processedTpcClusters) {
271 assert(i < collectionChunkSize);
272 cluster.x = -inputDataTpc[i].x;
273 cluster.y = 2 * inputDataTpc[i].y;
274 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
275 cluster.q = inputDataTpc[i].q;
276 i++;
277 }
278
279 i = 0;
280 for (auto& cluster : processedItsClusters) {
281 assert(i < collectionChunkSize);
282 cluster.x = -inputDataIts[i].x;
283 cluster.y = 2 * inputDataIts[i].y;
284 cluster.z = inputDataIts[i].z * inputDataIts[i].q;
285 cluster.q = inputDataIts[i].q;
286 i++;
287 }
288};
289
291{
292 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-proc").payload);
293 const FakeCluster* inputDataIts = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataITS-proc").payload);
294}
A declaration of O2 Data Sampling Header.
Definition of O2 Data Sampling, v1.0.
int32_t i
uint32_t j
Definition RawData.h:0
Class for time synchronization of RawReader instances.
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
void customize(std::vector< CompletionPolicy > &policies)
void someDataProducerAlgorithm(ProcessingContext &ctx)
void someSinkAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
GLint GLenum GLint x
Definition glcorearb.h:403
GLdouble GLdouble GLdouble z
Definition glcorearb.h:843
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::function< void(ProcessingContext &)> ProcessCallback
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"