Project
Loading...
Searching...
No Matches
AnalysisSupportHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17#include "WorkflowHelpers.h"
18
19template class std::vector<o2::framework::OutputObjectInfo>;
20template class std::vector<o2::framework::OutputTaskInfo>;
21
22namespace o2::framework
23{
24
25std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx)
26{
27 auto const& options = ctx.options();
28 auto const& OutputsInputs = ctx.services().get<AnalysisContext>().outputsInputs;
29 auto const& isDangling = ctx.services().get<AnalysisContext>().isDangling;
30
31 std::shared_ptr<DataOutputDirector> dod = std::make_shared<DataOutputDirector>();
32
33 // analyze options and take actions accordingly
34 // default values
35 std::string rdn, resdir("./");
36 std::string fnb, fnbase("AnalysisResults_trees");
37 float mfs, maxfilesize(-1.);
38 std::string fmo, filemode("RECREATE");
39 int ntfm, ntfmerge = 1;
40
41 // values from json
42 if (options.isSet("aod-writer-json")) {
43 auto fnjson = options.get<std::string>("aod-writer-json");
44 if (!fnjson.empty()) {
45 std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson);
46 if (!rdn.empty()) {
47 resdir = rdn;
48 }
49 if (!fnb.empty()) {
50 fnbase = fnb;
51 }
52 if (!fmo.empty()) {
53 filemode = fmo;
54 }
55 if (mfs > 0.) {
56 maxfilesize = mfs;
57 }
58 if (ntfm > 0) {
59 ntfmerge = ntfm;
60 }
61 }
62 }
63
64 // values from command line options, information from json is overwritten
65 if (options.isSet("aod-writer-resdir")) {
66 rdn = options.get<std::string>("aod-writer-resdir");
67 if (!rdn.empty()) {
68 resdir = rdn;
69 }
70 }
71 if (options.isSet("aod-writer-resfile")) {
72 fnb = options.get<std::string>("aod-writer-resfile");
73 if (!fnb.empty()) {
74 fnbase = fnb;
75 }
76 }
77 if (options.isSet("aod-writer-resmode")) {
78 fmo = options.get<std::string>("aod-writer-resmode");
79 if (!fmo.empty()) {
80 filemode = fmo;
81 }
82 }
83 if (options.isSet("aod-writer-maxfilesize")) {
84 mfs = options.get<float>("aod-writer-maxfilesize");
85 if (mfs > 0) {
86 maxfilesize = mfs;
87 }
88 }
89 if (options.isSet("aod-writer-ntfmerge")) {
90 ntfm = options.get<int>("aod-writer-ntfmerge");
91 if (ntfm > 0) {
92 ntfmerge = ntfm;
93 }
94 }
95 // parse the keepString
96 if (options.isSet("aod-writer-keep")) {
97 auto keepString = options.get<std::string>("aod-writer-keep");
98 if (!keepString.empty()) {
99 dod->reset();
100 std::string d("dangling");
101 if (d.find(keepString) == 0) {
102 // use the dangling outputs
103 std::vector<InputSpec> danglingOutputs;
104 for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
105 if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) {
106 danglingOutputs.emplace_back(OutputsInputs[ii]);
107 }
108 }
109 dod->readSpecs(danglingOutputs);
110 } else {
111 // use the keep string
112 dod->readString(keepString);
113 }
114 }
115 }
116 dod->setResultDir(resdir);
117 dod->setFilenameBase(fnbase);
118 dod->setFileMode(filemode);
119 dod->setMaximumFileSize(maxfilesize);
120 dod->setNumberTimeFramesToMerge(ntfmerge);
121
122 return dod;
123}
124
125void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector<OutputSpec> const& providedOutputs,
126 std::vector<InputSpec> const& requestedInputs,
127 DataProcessorSpec& publisher)
128{
129 requestedInputs |
130 views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided
131 views::filter_not_matching(publisher.outputs) | // filter the inputs that are already covered
132 views::input_to_output_specs() |
133 sinks::append_to{publisher.outputs}; // append them to the publisher outputs
134}
135
136void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec> const& providedSpecials,
137 std::vector<InputSpec> const& requestedSpecials,
138 std::vector<InputSpec>& requestedAODs,
139 DataProcessorSpec& publisher)
140{
141 requestedSpecials |
142 views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided
143 views::input_to_output_specs() |
144 sinks::append_to{publisher.outputs}; // append them to the publisher outputs
145
146 std::vector<InputSpec> additionalInputs;
147 for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
148 input.metadata |
149 views::filter_string_params_with("input:") |
150 views::params_to_input_specs() |
151 sinks::update_input_list{additionalInputs}; // store into a temporary
152 }
153 additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs
154 additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
155}
156
157void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> const& requestedSpecials,
158 std::vector<InputSpec>& requestedAODs,
159 std::vector<InputSpec>& requestedDYNs,
160 DataProcessorSpec& publisher)
161{
162 requestedSpecials |
163 views::input_to_output_specs() |
164 sinks::append_to{publisher.outputs}; // append them to the publisher outputs
165
166 std::vector<InputSpec> additionalInputs;
167 for (auto const& input : requestedSpecials) {
168 input.metadata |
169 views::filter_string_params_with("input:") |
170 views::params_to_input_specs() |
171 sinks::update_input_list{additionalInputs}; // store into a temporary
172 }
173
174 additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
175 // FIXME: until we have a single list of pairs
176 additionalInputs |
177 views::partial_match_filter(AODOrigins) |
178 sinks::update_input_list{requestedAODs}; // update requestedAODs
179 additionalInputs |
180 views::partial_match_filter(header::DataOrigin{"DYN"}) |
181 sinks::update_input_list{requestedDYNs}; // update requestedDYNs
182}
183
184// =============================================================================
185DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx)
186{
187 // Lifetime is sporadic because we do not ask each analysis task to send its
188 // results every timeframe.
190 .name = "internal-dpl-aod-global-analysis-file-sink",
191 .inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}), Lifetime::Sporadic)},
192 .outputs = {},
193 .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTObjWriter", ctx),
194 };
195
196 return spec;
197}
198
199// add sink for the AODs
201 AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx)
202{
203 auto& ac = ctx.services().get<AnalysisContext>();
204
205 // the command line options relevant for the writer are global
206 // see runDataProcessing.h
208 .name = "internal-dpl-aod-writer",
209 .inputs = ac.outputsInputsAOD,
210 .outputs = {},
211 .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTTTreeWriter", ctx),
212 };
213
214 return spec;
215}
216} // namespace o2::framework
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
Defining PrimaryVertex explicitly as messageable.
o2::framework::InputSpec InputSpec