Project
Loading...
Searching...
No Matches
PublisherSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
20#include "Headers/DataHeader.h"
21#include "TPCBase/Sector.h"
24#include <memory> // for make_shared, make_unique, unique_ptr
25#include <array>
26#include <vector>
27#include <utility> // std::move
28#include <stdexcept> //std::invalid_argument
29#include <TFile.h>
30#include <TTree.h>
31#include <TBranch.h>
32
33using namespace o2::framework;
34using namespace o2::header;
35
36namespace o2
37{
38namespace tpc
39{
40
46{
47 if (config.tpcSectors.size() == 0 || config.outputIds.size() == 0) {
48 throw std::invalid_argument("need TPC sector and output id configuration");
49 }
50 constexpr static size_t NSectors = o2::tpc::Sector::MAXSECTOR;
51 enum struct SectorMode {
52 Sector, // stored in sector branches
53 Full, // full TPC stored in one branch
54 };
55 struct ProcessAttributes {
56 std::vector<int> sectors;
57 std::vector<int> outputIds;
58 std::vector<o2::header::DataHeader::SubSpecificationType> zeroLengthOutputs;
59 uint64_t activeSectors = 0;
60 std::array<std::shared_ptr<RootTreeReader>, NSectors> readers;
61 bool terminateOnEod = false;
62 bool finished = false;
63 SectorMode sectorMode = SectorMode::Sector;
64 };
65
66 auto initFunction = [config, propagateMC, creator](InitContext& ic) {
67 // get the option from the init context
68 auto filename = o2::utils::Str::concat_string(o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir")),
69 ic.options().get<std::string>("infile"));
70 auto treename = ic.options().get<std::string>("treename");
71 auto clbrName = ic.options().get<std::string>(config.databranch.option.c_str());
72 auto mcbrName = ic.options().get<std::string>(config.mcbranch.option.c_str());
73 auto nofEvents = ic.options().get<int>("nevents");
75
76 // do a runtime check if the branch name without sector number suffix is found in the file
77 // if found the publisher will publish the single data set at one output route and empty
78 // messages at all the others
79 auto checkSectorMode = [&filename, &treename, &clbrName]() -> SectorMode {
80 std::unique_ptr<TFile> file(TFile::Open(filename.c_str()));
81 if (file) {
82 TTree* tree = reinterpret_cast<TTree*>(file->GetObjectChecked(treename.c_str(), "TTree"));
83 if (tree) {
84 const auto brlist = tree->GetListOfBranches();
85 for (TObject const* entry : *brlist) {
86 if (clbrName == entry->GetName()) {
87 return SectorMode::Full;
88 }
89 }
90 }
91 file->Close();
92 }
93 return SectorMode::Sector;
94 };
95
96 auto processAttributes = std::make_shared<ProcessAttributes>();
97 {
98 processAttributes->terminateOnEod = ic.options().get<bool>("terminate-on-eod");
99 processAttributes->sectorMode = checkSectorMode();
100 auto& sectors = processAttributes->sectors;
101 auto& activeSectors = processAttributes->activeSectors;
102 auto& readers = processAttributes->readers;
103 auto& outputIds = processAttributes->outputIds;
104 auto& sectorMode = processAttributes->sectorMode;
105
106 sectors = config.tpcSectors;
107 outputIds = config.outputIds;
108 for (auto const& s : sectors) {
109 // set the mask of active sectors
110 if (s >= NSectors) {
111 std::string message = std::string("invalid sector range specified, allowed 0-") + std::to_string(NSectors - 1);
112 // FIXME should probably be FATAL, but this doesn't seem to be handled in the DPL control flow
113 // at least the process is not marked dead in the DebugGUI
114 LOG(error) << message;
115 throw std::invalid_argument(message);
116 }
117 activeSectors |= (uint64_t)0x1 << s;
118 }
119
120 // set up the tree interface
121 // TODO: parallelism on sectors needs to be implemented as selector in the reader
122 // the data is now in parallel branches, as first attempt use an array of readers
123 auto outputId = outputIds.begin();
124 for (auto const& sector : sectors) {
126 std::string sectorfile = filename;
127 if (filename.find('%') != std::string::npos) {
128 std::vector<char> formattedname(filename.length() + 10, 0);
129 snprintf(formattedname.data(), formattedname.size() - 1, filename.c_str(), sector);
130 sectorfile = formattedname.data();
131 }
132 std::string clusterbranchname = clbrName;
133 std::string mcbranchname = mcbrName;
134 if (sectorMode == SectorMode::Sector) {
135 clusterbranchname += "_" + std::to_string(sector);
136 mcbranchname += "_" + std::to_string(sector);
137 }
138 readers[sector] = creator(treename.c_str(), // tree name
139 sectorfile.c_str(), // input file name
140 nofEvents, // number of entries to publish
141 publishingMode,
142 subSpec,
143 clusterbranchname.c_str(), // name of data branch
144 mcbranchname.c_str(), // name of mc label branch
145 config.hook);
146 if (sectorMode == SectorMode::Full) {
147 break;
148 }
149 if (++outputId == outputIds.end()) {
150 outputId = outputIds.begin();
151 }
152 }
153 if (sectorMode == SectorMode::Full) {
154 // the slot of the first configured sector is used to publish the full set, all others removed
155 sectors.resize(1);
156 // the data will be published at first configured output id, zero-length data on all other output ids
157 processAttributes->zeroLengthOutputs.assign(++outputId, outputIds.end());
158 }
159 }
160
161 // set up the processing function
162 // using by-copy capture of the worker instance shared pointer
163 // the shared pointer makes sure to clean up the instance when the processing
164 // function gets out of scope
165 // FIXME: wanted to use it = sectors.begin() in the variable capture but the iterator
166 // is const and can not be incremented
167 auto processingFct = [processAttributes, config](ProcessingContext& pc) {
168 if (processAttributes->finished) {
169 return;
170 }
171
172 bool eos = false;
173 auto const& sectors = processAttributes->sectors;
174 for (auto const& sector : sectors) {
175 auto& activeSectors = processAttributes->activeSectors;
176 auto& readers = processAttributes->readers;
177 o2::tpc::TPCSectorHeader header{sector};
178 if (processAttributes->sectorMode == SectorMode::Full) {
179 header.sectorBits = activeSectors;
180 }
181 header.activeSectors = activeSectors;
182 auto& r = *(readers[sector].get());
183
184 // increment the reader and invoke it for the processing context
185 if (r.next()) {
186 // there is data, run the reader
187 r(pc, header);
188 } else {
189 // no more data, delete the reader
190 readers[sector].reset();
191 eos = true;
192 }
193 }
194
195 if (eos) {
196 processAttributes->finished = true;
197 pc.services().get<ControlService>().endOfStream();
198 pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
199 } else {
200 // publish empty events
203 o2::tpc::TPCSectorHeader header{0};
204 header.sectorBits = 0;
205 header.activeSectors = processAttributes->activeSectors;
206 for (auto const& subSpec : processAttributes->zeroLengthOutputs) {
207 pc.outputs().make<char>({dto.origin, dto.description, subSpec, {header}});
208 if (pc.outputs().isAllowed({mco.origin, mco.description, subSpec})) {
209 pc.outputs().make<char>({mco.origin, mco.description, subSpec, {header}});
210 }
211 }
212 }
213 };
214
215 // return the actual processing function as a lambda function using variables
216 // of the init function
217 return processingFct;
218 };
219
220 auto createOutputSpecs = [&config, propagateMC]() {
221 std::vector<OutputSpec> outputSpecs;
222 for (size_t n = 0; n < config.outputIds.size(); ++n) {
226 outputSpecs.emplace_back(OutputSpec{{"output"}, dto.origin, dto.description, subSpec, Lifetime::Timeframe});
227 if (propagateMC) {
228 outputSpecs.emplace_back(OutputSpec{{"outputMC"}, mco.origin, mco.description, subSpec, Lifetime::Timeframe});
229 }
230 }
231 return std::move(outputSpecs);
232 };
233
234 auto& dtb = config.databranch;
235 auto& mcb = config.mcbranch;
236 return DataProcessorSpec{config.processName.c_str(),
237 Inputs{}, // no inputs
238 {createOutputSpecs()},
239 AlgorithmSpec(initFunction),
240 Options{
241 {"infile", VariantType::String, config.defaultFileName.c_str(), {"Name of the input file"}},
242 {"input-dir", VariantType::String, "none", {"Input directory"}},
243 {"treename", VariantType::String, config.defaultTreeName.c_str(), {"Name of input tree"}},
244 {dtb.option.c_str(), VariantType::String, dtb.defval.c_str(), {dtb.help.c_str()}},
245 {mcb.option.c_str(), VariantType::String, mcb.defval.c_str(), {mcb.help.c_str()}},
246 {"nevents", VariantType::Int, -1, {"number of events to run"}},
247 {"terminate-on-eod", VariantType::Bool, true, {"terminate on end-of-data"}},
248 }};
249}
250} // end namespace tpc
251} // end namespace o2
Definition of the Names Generator class.
static constexpr int MAXSECTOR
Definition Sector.h:44
GLdouble n
Definition glcorearb.h:1982
GLuint entry
Definition glcorearb.h:5735
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLboolean r
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > Inputs
O2 data header classes and API, v0.1.
Definition DetID.h:49
std::function< std::shared_ptr< Reader >(const char *, const char *, int, Reader::PublishingMode, o2::header::DataHeader::SubSpecificationType, const char *, const char *, Reader::SpecialPublishHook *)> Creator
@ Full
Full weighting.
framework::DataProcessorSpec createPublisherSpec(PublisherConf const &config, bool propagateMC, workflow_reader::Creator creator)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
uint32_t SubSpecificationType
Definition DataHeader.h:620
BranchOptionConfig mcbranch
std::string defaultTreeName
BranchOptionConfig databranch
std::string defaultFileName
Reader::SpecialPublishHook * hook
std::vector< int > outputIds
std::vector< int > tpcSectors
static std::string rectifyDirectory(const std::string_view p)
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))