Project
Loading...
Searching...
No Matches
AnalysisSupportHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
21#include "WorkflowHelpers.h"
22
23template class std::vector<o2::framework::OutputObjectInfo>;
24template class std::vector<o2::framework::OutputTaskInfo>;
25
26namespace o2::framework
27{
28
29std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx)
30{
31 auto const& options = ctx.options();
32 auto const& OutputsInputs = ctx.services().get<AnalysisContext>().outputsInputs;
33 auto const& isDangling = ctx.services().get<AnalysisContext>().isDangling;
34
35 std::shared_ptr<DataOutputDirector> dod = std::make_shared<DataOutputDirector>();
36
37 // analyze options and take actions accordingly
38 // default values
39 std::string rdn, resdir("./");
40 std::string fnb, fnbase("AnalysisResults_trees");
41 float mfs, maxfilesize(-1.);
42 std::string fmo, filemode("RECREATE");
43 int ntfm, ntfmerge = 1;
44
45 // values from json
46 if (options.isSet("aod-writer-json")) {
47 auto fnjson = options.get<std::string>("aod-writer-json");
48 if (!fnjson.empty()) {
49 std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson);
50 if (!rdn.empty()) {
51 resdir = rdn;
52 }
53 if (!fnb.empty()) {
54 fnbase = fnb;
55 }
56 if (!fmo.empty()) {
57 filemode = fmo;
58 }
59 if (mfs > 0.) {
60 maxfilesize = mfs;
61 }
62 if (ntfm > 0) {
63 ntfmerge = ntfm;
64 }
65 }
66 }
67
68 // values from command line options, information from json is overwritten
69 if (options.isSet("aod-writer-resdir")) {
70 rdn = options.get<std::string>("aod-writer-resdir");
71 if (!rdn.empty()) {
72 resdir = rdn;
73 }
74 }
75 if (options.isSet("aod-writer-resfile")) {
76 fnb = options.get<std::string>("aod-writer-resfile");
77 if (!fnb.empty()) {
78 fnbase = fnb;
79 }
80 }
81 if (options.isSet("aod-writer-resmode")) {
82 fmo = options.get<std::string>("aod-writer-resmode");
83 if (!fmo.empty()) {
84 filemode = fmo;
85 }
86 }
87 if (options.isSet("aod-writer-maxfilesize")) {
88 mfs = options.get<float>("aod-writer-maxfilesize");
89 if (mfs > 0) {
90 maxfilesize = mfs;
91 }
92 }
93 if (options.isSet("aod-writer-ntfmerge")) {
94 ntfm = options.get<int>("aod-writer-ntfmerge");
95 if (ntfm > 0) {
96 ntfmerge = ntfm;
97 }
98 }
99 // parse the keepString
100 if (options.isSet("aod-writer-keep")) {
101 auto keepString = options.get<std::string>("aod-writer-keep");
102 if (!keepString.empty()) {
103 dod->reset();
104 std::string d("dangling");
105 if (d.find(keepString) == 0) {
106 // use the dangling outputs
107 std::vector<InputSpec> danglingOutputs;
108 for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
109 if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) {
110 danglingOutputs.emplace_back(OutputsInputs[ii]);
111 }
112 }
113 dod->readSpecs(danglingOutputs);
114 } else {
115 // use the keep string
116 dod->readString(keepString);
117 }
118 }
119 }
120 dod->setResultDir(resdir);
121 dod->setFilenameBase(fnbase);
122 dod->setFileMode(filemode);
123 dod->setMaximumFileSize(maxfilesize);
124 dod->setNumberTimeFramesToMerge(ntfmerge);
125
126 return dod;
127}
128
129void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector<OutputSpec> const& providedOutputs,
130 std::vector<InputSpec> const& requestedInputs,
131 DataProcessorSpec& publisher)
132{
133 auto matchingOutputFor = [](InputSpec const& requested) {
134 return [&requested](OutputSpec const& provided) {
135 return DataSpecUtils::match(requested, provided);
136 };
137 };
138 for (InputSpec const& requested : requestedInputs) {
139 auto provided = std::find_if(providedOutputs.begin(),
140 providedOutputs.end(),
141 matchingOutputFor(requested));
142
143 if (provided != providedOutputs.end()) {
144 continue;
145 }
146
147 auto inList = std::find_if(publisher.outputs.begin(),
148 publisher.outputs.end(),
149 matchingOutputFor(requested));
150 if (inList != publisher.outputs.end()) {
151 continue;
152 }
153
154 auto concrete = DataSpecUtils::asConcreteDataMatcher(requested);
155 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, requested.lifetime, requested.metadata);
156 }
157}
158
159void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec> const& providedSpecials,
160 std::vector<InputSpec> const& requestedSpecials,
161 std::vector<InputSpec>& requestedAODs,
162 DataProcessorSpec& publisher)
163{
164 for (auto& input : requestedSpecials) {
165 if (std::any_of(providedSpecials.begin(), providedSpecials.end(), [&input](auto const& x) {
166 return DataSpecUtils::match(input, x);
167 })) {
168 continue;
169 }
170 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
171 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
172 for (auto& i : input.metadata) {
173 if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
174 auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
175 auto j = std::find(publisher.inputs.begin(), publisher.inputs.end(), spec);
176 if (j == publisher.inputs.end()) {
177 publisher.inputs.push_back(spec);
178 }
179 DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
180 }
181 }
182 }
183}
184
185void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
186 std::vector<InputSpec>& requestedAODs,
187 std::vector<InputSpec>& requestedDYNs,
188 DataProcessorSpec& publisher)
189{
190 for (auto& input : requestedSpecials) {
191 auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
192 publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
193 for (auto& i : input.metadata) {
194 if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
195 auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
196 auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
197 if (j == publisher.inputs.end()) {
198 publisher.inputs.push_back(spec);
199 }
200 if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
201 DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
202 } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
203 DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
204 }
205 }
206 }
207 }
208}
209
210// =============================================================================
211DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx)
212{
213 // Lifetime is sporadic because we do not ask each analysis task to send its
214 // results every timeframe.
216 .name = "internal-dpl-aod-global-analysis-file-sink",
217 .inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}), Lifetime::Sporadic)},
218 .outputs = {},
219 .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTObjWriter", ctx),
220 };
221
222 return spec;
223}
224
225// add sink for the AODs
227 AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx)
228{
229 auto& ac = ctx.services().get<AnalysisContext>();
230
231 // the command line options relevant for the writer are global
232 // see runDataProcessing.h
234 .name = "internal-dpl-aod-writer",
235 .inputs = ac.outputsInputsAOD,
236 .outputs = {},
237 .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTTTreeWriter", ctx),
238 };
239
240 return spec;
241}
242} // namespace o2::framework
int32_t i
uint32_t j
Definition RawData.h:0
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
GLint GLenum GLint x
Definition glcorearb.h:403
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
o2::framework::InputSpec InputSpec