Project
Loading...
Searching...
No Matches
AnalysisSupportHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
20#include "WorkflowHelpers.h"
21
22template class std::vector<o2::framework::OutputObjectInfo>;
23template class std::vector<o2::framework::OutputTaskInfo>;
24
25namespace o2::framework
26{
27
28std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx)
29{
30 auto const& options = ctx.options();
31 auto const& OutputsInputs = ctx.services().get<AnalysisContext>().outputsInputs;
32 auto const& isDangling = ctx.services().get<AnalysisContext>().isDangling;
33
34 std::shared_ptr<DataOutputDirector> dod = std::make_shared<DataOutputDirector>();
35
36 // analyze options and take actions accordingly
37 // default values
38 std::string rdn, resdir("./");
39 std::string fnb, fnbase("AnalysisResults_trees");
40 float mfs, maxfilesize(-1.);
41 std::string fmo, filemode("RECREATE");
42 int ntfm, ntfmerge = 1;
43
44 // values from json
45 if (options.isSet("aod-writer-json")) {
46 auto fnjson = options.get<std::string>("aod-writer-json");
47 if (!fnjson.empty()) {
48 std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson);
49 if (!rdn.empty()) {
50 resdir = rdn;
51 }
52 if (!fnb.empty()) {
53 fnbase = fnb;
54 }
55 if (!fmo.empty()) {
56 filemode = fmo;
57 }
58 if (mfs > 0.) {
59 maxfilesize = mfs;
60 }
61 if (ntfm > 0) {
62 ntfmerge = ntfm;
63 }
64 }
65 }
66
67 // values from command line options, information from json is overwritten
68 if (options.isSet("aod-writer-resdir")) {
69 rdn = options.get<std::string>("aod-writer-resdir");
70 if (!rdn.empty()) {
71 resdir = rdn;
72 }
73 }
74 if (options.isSet("aod-writer-resfile")) {
75 fnb = options.get<std::string>("aod-writer-resfile");
76 if (!fnb.empty()) {
77 fnbase = fnb;
78 }
79 }
80 if (options.isSet("aod-writer-resmode")) {
81 fmo = options.get<std::string>("aod-writer-resmode");
82 if (!fmo.empty()) {
83 filemode = fmo;
84 }
85 }
86 if (options.isSet("aod-writer-maxfilesize")) {
87 mfs = options.get<float>("aod-writer-maxfilesize");
88 if (mfs > 0) {
89 maxfilesize = mfs;
90 }
91 }
92 if (options.isSet("aod-writer-ntfmerge")) {
93 ntfm = options.get<int>("aod-writer-ntfmerge");
94 if (ntfm > 0) {
95 ntfmerge = ntfm;
96 }
97 }
98 // parse the keepString
99 if (options.isSet("aod-writer-keep")) {
100 auto keepString = options.get<std::string>("aod-writer-keep");
101 if (!keepString.empty()) {
102 dod->reset();
103 std::string d("dangling");
104 if (d.find(keepString) == 0) {
105 // use the dangling outputs
106 std::vector<InputSpec> danglingOutputs;
107 for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
108 if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) {
109 danglingOutputs.emplace_back(OutputsInputs[ii]);
110 }
111 }
112 dod->readSpecs(danglingOutputs);
113 } else {
114 // use the keep string
115 dod->readString(keepString);
116 }
117 }
118 }
119 dod->setResultDir(resdir);
120 dod->setFilenameBase(fnbase);
121 dod->setFileMode(filemode);
122 dod->setMaximumFileSize(maxfilesize);
123 dod->setNumberTimeFramesToMerge(ntfmerge);
124
125 return dod;
126}
127
128void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector<OutputSpec> const& providedOutputs,
129 std::vector<InputSpec> const& requestedInputs,
130 DataProcessorSpec& publisher)
131{
132 auto matchingOutputFor = [](InputSpec const& requested) {
133 return [&requested](OutputSpec const& provided) {
134 return DataSpecUtils::match(requested, provided);
135 };
136 };
137 for (InputSpec const& requested : requestedInputs) {
138 auto provided = std::find_if(providedOutputs.begin(),
139 providedOutputs.end(),
140 matchingOutputFor(requested));
141
142 if (provided != providedOutputs.end()) {
143 continue;
144 }
145
146 auto inList = std::find_if(publisher.outputs.begin(),
147 publisher.outputs.end(),
148 matchingOutputFor(requested));
149 if (inList != publisher.outputs.end()) {
150 continue;
151 }
152
153 auto concrete = DataSpecUtils::asConcreteDataMatcher(requested);
154 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, requested.lifetime, requested.metadata);
155 }
156}
157
158void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec> const& providedSpecials,
159 std::vector<InputSpec> const& requestedSpecials,
160 std::vector<InputSpec>& requestedAODs,
161 DataProcessorSpec& publisher)
162{
163 for (auto& input : requestedSpecials) {
164 if (std::any_of(providedSpecials.begin(), providedSpecials.end(), [&input](auto const& x) {
165 return DataSpecUtils::match(input, x);
166 })) {
167 continue;
168 }
169 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
170 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
171 for (auto& i : input.metadata) {
172 if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
173 auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
174 auto j = std::find(publisher.inputs.begin(), publisher.inputs.end(), spec);
175 if (j == publisher.inputs.end()) {
176 publisher.inputs.push_back(spec);
177 }
178 DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
179 }
180 }
181 }
182}
183
184void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
185 std::vector<InputSpec>& requestedAODs,
186 std::vector<InputSpec>& requestedDYNs,
187 DataProcessorSpec& publisher)
188{
189 for (auto& input : requestedSpecials) {
190 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
191 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
192 for (auto& i : input.metadata) {
193 if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
194 auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
195 auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
196 if (j == publisher.inputs.end()) {
197 publisher.inputs.push_back(spec);
198 }
199 if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
200 DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
201 } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
202 DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
203 }
204 }
205 }
206 }
207}
208
209void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
210 std::vector<OutputSpec> const& providedSpecials,
211 std::vector<InputSpec> const& requestedSpecials,
212 std::vector<InputSpec>& requestedAODs,
213 std::vector<InputSpec>& requestedDYNs,
214 DataProcessorSpec& publisher)
215{
216 for (auto& input : requestedSpecials) {
217 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
218 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
219 // FIXME: good enough for now...
220 for (auto& i : input.metadata) {
221 if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
222 auto value = i.defaultValue.get<std::string>();
223 auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
224 auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
225 if (j == publisher.inputs.end()) {
226 publisher.inputs.push_back(spec);
227 }
228 if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
229 DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
230 } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
231 DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
232 }
233 }
234 }
235 }
236}
237
238// =============================================================================
239DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx)
240{
241 // Lifetime is sporadic because we do not ask each analysis task to send its
242 // results every timeframe.
244 .name = "internal-dpl-aod-global-analysis-file-sink",
245 .inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}), Lifetime::Sporadic)},
246 .outputs = {},
247 .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTObjWriter", ctx),
248 };
249
250 return spec;
251}
252
253// add sink for the AODs
255 AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx)
256{
257 auto& ac = ctx.services().get<AnalysisContext>();
258
259 // the command line options relevant for the writer are global
260 // see runDataProcessing.h
262 .name = "internal-dpl-aod-writer",
263 .inputs = ac.outputsInputsAOD,
264 .outputs = {},
265 .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTTTreeWriter", ctx),
266 };
267
268 return spec;
269}
270} // namespace o2::framework
int32_t i
uint32_t j
Definition RawData.h:0
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
GLint GLenum GLint x
Definition glcorearb.h:403
GLsizei const GLfloat * value
Definition glcorearb.h:819
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::framework::InputSpec InputSpec