Project
Loading...
Searching...
No Matches
clusters-sampler-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
17#include <iostream>
18#include <fstream>
19#include <stdexcept>
20#include <vector>
21
26#include "Framework/Lifetime.h"
27#include "Framework/Output.h"
28#include "Framework/Task.h"
29#include "Framework/Logger.h"
32
36
37using namespace o2::framework;
38
39//_________________________________________________________________________________________________
40void customize(std::vector<ConfigParamSpec>& workflowOptions)
41{
43 workflowOptions.emplace_back("global", VariantType::Bool, false,
44 ConfigParamSpec::HelpString{"assume the read clusters are in global reference frame"});
45 workflowOptions.emplace_back("no-digits", VariantType::Bool, false,
46 ConfigParamSpec::HelpString{"do not look for digits"});
47}
48
50
51using namespace o2::mch;
52
54{
55 public:
56 //_________________________________________________________________________________________________
57 ClusterSamplerTask(bool doDigits) : mDoDigits(doDigits) {}
58
59 //_________________________________________________________________________________________________
60 void init(InitContext& ic)
61 {
63 LOG(info) << "initializing cluster sampler";
64
65 auto inputFileName = ic.options().get<std::string>("infile");
66 mInputFile.open(inputFileName, std::ios::binary);
67 if (!mInputFile.is_open()) {
68 throw std::invalid_argument("cannot open input file" + inputFileName);
69 }
70 if (mInputFile.peek() == EOF) {
71 throw std::length_error("input file is empty");
72 }
73
74 mNEventsPerTF = ic.options().get<int>("nEventsPerTF");
75 if (mNEventsPerTF < 1) {
76 throw std::invalid_argument("number of events per time frame must be >= 1");
77 }
78
79 auto stop = [this]() {
81 LOG(info) << "stop cluster sampler";
82 this->mInputFile.close();
83 };
84 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(stop);
85 }
86
87 //_________________________________________________________________________________________________
89 {
91
92 static uint32_t event(0);
93
94 // reached eof
95 if (mInputFile.peek() == EOF) {
96 pc.services().get<ControlService>().endOfStream();
97 // pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
98 return;
99 }
100
101 // create the output messages
102 auto& rofs = pc.outputs().make<std::vector<ROFRecord>>(OutputRef{"rofs"});
103 auto& clusters = pc.outputs().make<std::vector<Cluster>>(OutputRef{"clusters"});
104 std::vector<Digit, o2::pmr::polymorphic_allocator<Digit>>* digits(nullptr);
105 if (mDoDigits) {
106 digits = &pc.outputs().make<std::vector<Digit>>(OutputRef{"digits"});
107 }
108
109 // loop over the requested number of events (or until eof) and fill the messages
110 for (int iEvt = 0; iEvt < mNEventsPerTF && mInputFile.peek() != EOF; ++iEvt) {
111 int nClusters = readOneEvent(clusters, digits);
112 rofs.emplace_back(o2::InteractionRecord{0, event++}, clusters.size() - nClusters, nClusters);
113 }
114 }
115
116 private:
117 //_________________________________________________________________________________________________
118 int readOneEvent(std::vector<Cluster, o2::pmr::polymorphic_allocator<Cluster>>& clusters,
119 std::vector<Digit, o2::pmr::polymorphic_allocator<Digit>>* digits)
120 {
122
123 // get the number of clusters
124 int nClusters(-1);
125 mInputFile.read(reinterpret_cast<char*>(&nClusters), sizeof(int));
126 if (mInputFile.fail()) {
127 throw std::length_error("invalid input");
128 }
129
130 // get the number of associated digits
131 int nDigits(-1);
132 mInputFile.read(reinterpret_cast<char*>(&nDigits), sizeof(int));
133 if (mInputFile.fail()) {
134 throw std::length_error("invalid input");
135 }
136
137 if (nClusters < 0 || nDigits < 0) {
138 throw std::length_error("invalid input");
139 }
140
141 // stop here in case of empty event
142 if (nClusters == 0) {
143 if (nDigits > 0) {
144 throw std::length_error("invalid input");
145 }
146 LOG(info) << "event is empty";
147 return 0;
148 }
149
150 // fill clusters in O2 format
151 int clusterOffset = clusters.size();
152 clusters.resize(clusterOffset + nClusters);
153 mInputFile.read(reinterpret_cast<char*>(&clusters[clusterOffset]), nClusters * sizeof(Cluster));
154 if (mInputFile.fail()) {
155 throw std::length_error("invalid input");
156 }
157
158 // read digits if requested or skip them if any
159 if (mDoDigits) {
160 if (nDigits == 0) {
161 throw std::length_error("missing digits");
162 }
163 int digitOffset = digits->size();
164 digits->resize(digitOffset + nDigits);
165 mInputFile.read(reinterpret_cast<char*>(&(*digits)[digitOffset]), nDigits * sizeof(Digit));
166 if (mInputFile.fail()) {
167 throw std::length_error("invalid input");
168 }
169 for (auto itCluster = clusters.begin() + clusterOffset; itCluster < clusters.end(); ++itCluster) {
170 itCluster->firstDigit += digitOffset;
171 }
172 } else if (nDigits > 0) {
173 mInputFile.seekg(nDigits * sizeof(Digit), std::ios::cur);
174 if (mInputFile.fail()) {
175 throw std::length_error("invalid input");
176 }
177 }
178
179 return nClusters;
180 }
181
182 std::ifstream mInputFile{};
183 bool mDoDigits = false;
184 int mNEventsPerTF = 1;
185};
186
187//_________________________________________________________________________________________________
188DataProcessorSpec getClusterSamplerSpec(const char* specName, bool global, bool doDigits)
189{
190 std::vector<OutputSpec> outputSpecs{};
191 outputSpecs.emplace_back(OutputSpec{{"rofs"}, "MCH", "CLUSTERROFS", 0, Lifetime::Timeframe});
192 auto clusterDesc = global ? o2::header::DataDescription{"GLOBALCLUSTERS"} : o2::header::DataDescription{"CLUSTERS"};
193 outputSpecs.emplace_back(OutputSpec{{"clusters"}, "MCH", clusterDesc, 0, Lifetime::Timeframe});
194 if (doDigits) {
195 outputSpecs.emplace_back(OutputSpec{{"digits"}, "MCH", "CLUSTERDIGITS", 0, Lifetime::Timeframe});
196 }
197
198 return DataProcessorSpec{
199 specName,
200 Inputs{},
201 outputSpecs,
202 AlgorithmSpec{adaptFromTask<ClusterSamplerTask>(doDigits)},
203 Options{{"infile", VariantType::String, "", {"input filename"}},
204 {"nEventsPerTF", VariantType::Int, 1, {"number of events per time frame"}}}};
205}
206
207//_________________________________________________________________________________________________
209{
210 bool global = cc.options().get<bool>("global");
211 bool doDigits = !cc.options().get<bool>("no-digits");
212 return WorkflowSpec{getClusterSamplerSpec("mch-cluster-sampler", global, doDigits)};
213}
Definition of the MCH cluster minimal structure.
Definition of the MCH ROFrame record.
const char * specName
int nClusters
void run(ProcessingContext &pc)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
MCH digit implementation.
Definition Digit.h:31
WorkflowSpec defineDataProcessing(const ConfigContext &cc)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
DataProcessorSpec getClusterSamplerSpec(const char *specName, bool global, bool doDigits)
struct _cl_event * event
Definition glcorearb.h:2982
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
cluster minimal structure
Definition Cluster.h:31
std::vector< o2::mch::ChannelCode > cc
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Cluster > clusters
std::vector< Digit > digits