Project
Loading...
Searching...
No Matches
o2DummyWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "Framework/Logger.h"
15
16#include <chrono>
17
18using Monitoring = o2::monitoring::Monitoring;
19using namespace o2::framework;
20
21struct FakeCluster {
22 float x;
23 float y;
24 float z;
25 float q;
26};
27
28struct Summary {
29 int inputCount;
30 int clustersCount;
31};
32
35
36// This is how you can define your processing in a declarative way
37std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
38{
39 DataProcessorSpec timeframeReader{
40 "reader",
41 Inputs{},
42 {OutputSpec{{"tpc"}, "TPC", "CLUSTERS"},
43 OutputSpec{{"its"}, "ITS", "CLUSTERS"}},
45 [](ProcessingContext& ctx) {
46 std::this_thread::sleep_for(std::chrono::seconds(1));
47 // Creates a new message of size 1000 which
48 // has "TPC" as data origin and "CLUSTERS" as data description.
49 auto& tpcClusters = ctx.outputs().make<FakeCluster>(OutputRef{"tpc"}, 1000);
50 int i = 0;
51
52 for (auto& cluster : tpcClusters) {
53 // The assert is here simply because at some point we were allocating the
54 // wrong number of items.
55 assert(i < 1000);
56 cluster.x = i;
57 cluster.y = i;
58 cluster.z = i;
59 cluster.q = i;
60 i++;
61 }
62
63 auto& itsClusters = ctx.outputs().make<FakeCluster>(OutputRef{"its"}, 1000);
64 i = 0;
65 for (auto& cluster : itsClusters) {
66 assert(i < 1000);
67 cluster.x = i;
68 cluster.y = i;
69 cluster.z = i;
70 cluster.q = i;
71 i++;
72 }
73 // LOG(info) << "Invoked" << std::endl;
74 }}};
75
76 DataProcessorSpec tpcClusterSummary{
77 "tpc-cluster-summary",
78 {InputSpec{"clusters", "TPC", "CLUSTERS"}},
79 {OutputSpec{{"summary"}, "TPC", "SUMMARY"}},
81 auto& tpcSummary = ctx.outputs().make<Summary>(OutputRef{"summary"}, 1);
82 tpcSummary[0].inputCount = ctx.inputs().size();
83 }},
84 {ConfigParamSpec{"some-cut", VariantType::Float, 1.0f, {"some cut"}}},
85 };
86
87 DataProcessorSpec itsClusterSummary{
88 "its-cluster-summary",
89 {InputSpec{"clusters", "ITS", "CLUSTERS"}},
90 {
91 OutputSpec{{"summary"}, "ITS", "SUMMARY"},
92 },
94 auto& itsSummary = ctx.outputs().make<Summary>(OutputRef{"summary"}, 1);
95 itsSummary[0].inputCount = ctx.inputs().size();
96 }},
97 {ConfigParamSpec{"some-cut", VariantType::Float, 1.0f, {"some cut"}}},
98 };
99
100 DataProcessorSpec merger{
101 "merger",
102 {InputSpec{"clusters", "TPC", "CLUSTERS"},
103 InputSpec{"summary", "TPC", "SUMMARY"},
104 InputSpec{"other_summary", "ITS", "SUMMARY"}},
105 Outputs{},
107 [](ProcessingContext& ctx) {
108 // We verify we got inputs in the correct order
109 auto h0 = DataRefUtils::getHeader<DataHeader*>(ctx.inputs().get("clusters"));
110 auto h1 = DataRefUtils::getHeader<DataHeader*>(ctx.inputs().get("summary"));
111 auto h2 = DataRefUtils::getHeader<DataHeader*>(ctx.inputs().get("other_summary"));
112 // This should always be the case, since the
113 // test for an actual DataHeader should happen in the device itself.
114 assert(h0 && h1 && h2);
115 if (h0->dataOrigin != o2::header::DataOrigin("TPC")) {
116 throw std::runtime_error("Unexpected data origin" + std::string(h0->dataOrigin.str));
117 }
118
119 if (h1->dataOrigin != o2::header::DataOrigin("TPC")) {
120 throw std::runtime_error("Unexpected data origin" + std::string(h1->dataOrigin.str));
121 }
122
123 if (h2->dataOrigin != o2::header::DataOrigin("ITS")) {
124 throw std::runtime_error("Unexpected data origin" + std::string(h2->dataOrigin.str));
125 }
126
127 auto& metrics = ctx.services().get<Monitoring>();
128 metrics.send({1, "merger/invoked"});
129 metrics.send({(int)ctx.inputs().size(), "merger/inputs"});
130 },
131 }};
132 return {
133 timeframeReader,
134 tpcClusterSummary,
135 itsClusterSummary,
136 merger};
137}
int32_t i
o2::monitoring::Monitoring Monitoring
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
the main header struct
Definition DataHeader.h:618