QualityControl  1.5.1
O2 Data Quality Control Framework
TaskRunner.h
Go to the documentation of this file.
1 // Copyright CERN and copyright holders of ALICE O2. This software is
2 // distributed under the terms of the GNU General Public License v3 (GPL
3 // Version 3), copied verbatim in the file "COPYING".
4 //
5 // See http://alice-o2.web.cern.ch/license for full licensing information.
6 //
7 // In applying this license CERN does not waive the privileges and immunities
8 // granted to it by virtue of its status as an Intergovernmental Organization
9 // or submit itself to any jurisdiction.
10 
16 
17 #ifndef QC_CORE_TASKRUNNER_H
18 #define QC_CORE_TASKRUNNER_H
19 
20 #include <boost/property_tree/ptree.hpp>
21 
22 // O2
23 #include <Common/Timer.h>
24 #include <Framework/Task.h>
25 #include <Framework/DataProcessorSpec.h>
26 #include <Framework/CompletionPolicy.h>
27 #include <Framework/EndOfStreamContext.h>
28 #include <Headers/DataHeader.h>
29 #include <Framework/InitContext.h>
30 // QC
33 
34 namespace o2::configuration
35 {
36 class ConfigurationInterface;
37 }
38 
39 namespace o2::monitoring
40 {
41 class Monitoring;
42 }
43 
45 {
46 
68 class TaskRunner : public framework::Task
69 {
70  public:
76  TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id = 0);
77  ~TaskRunner() override = default;
78 
80  void init(framework::InitContext& iCtx) override;
82  void run(framework::ProcessingContext& pCtx) override;
83 
85  static framework::CompletionPolicy::CompletionOp completionPolicyCallback(o2::framework::CompletionPolicy::InputSet inputs);
86 
87  std::string getDeviceName() { return mDeviceName; };
88  const framework::Inputs& getInputsSpecs() { return mInputSpecs; };
89  const framework::OutputSpec getOutputSpec() { return mMonitorObjectsSpec; };
90  const framework::Options getOptions() { return mOptions; };
91 
92  void setResetAfterPublish(bool);
93 
95  static std::string createTaskRunnerIdString();
97  static header::DataOrigin createTaskDataOrigin();
99  static header::DataDescription createTaskDataDescription(const std::string& taskName);
100 
102  void endOfStream(framework::EndOfStreamContext& eosContext) override;
103 
104  private:
106  void start(const framework::ConfigParamRegistry& options);
108  void stop();
110  void reset();
111 
112  std::tuple<bool /*data ready*/, bool /*timer ready*/> validateInputs(const framework::InputRecord&);
113  void loadTaskConfig();
114  void loadTopologyConfig();
115  void startOfActivity();
116  void endOfActivity();
117  void startCycle();
118  void finishCycle(framework::DataAllocator& outputs);
119  int publish(framework::DataAllocator& outputs);
120  void publishCycleStats();
121  void saveToFile();
122 
123  private:
124  std::string mDeviceName;
125  TaskConfig mTaskConfig;
126  std::shared_ptr<configuration::ConfigurationInterface> mConfigFile; // used in init only
127  std::shared_ptr<monitoring::Monitoring> mCollector;
128  std::shared_ptr<TaskInterface> mTask;
129  bool mResetAfterPublish = false;
130  std::shared_ptr<ObjectsManager> mObjectsManager;
131  int mRunNumber;
132 
133  std::string validateDetectorName(std::string name) const;
134  boost::property_tree::ptree getTaskConfigTree() const;
135  void updateMonitoringStats(framework::ProcessingContext& pCtx);
136 
137  // consider moving these to TaskConfig
138  framework::Inputs mInputSpecs;
139  framework::OutputSpec mMonitorObjectsSpec;
140  framework::Options mOptions;
141 
142  bool mCycleOn = false;
143  bool mNoMoreCycles = false;
144  int mCycleNumber = 0;
145 
146  // stats
147  int mNumberMessagesReceivedInCycle = 0;
148  int mNumberObjectsPublishedInCycle = 0;
149  int mTotalNumberObjectsPublished = 0; // over a run
150  double mLastPublicationDuration = 0;
151  int mDataReceivedInCycle = 0;
152  AliceO2::Common::Timer mTimerTotalDurationActivity;
153  AliceO2::Common::Timer mTimerDurationCycle;
154 };
155 
156 } // namespace o2::quality_control::core
157 
158 #endif // QC_CORE_TASKRUNNER_H
Definition: AggregatorRunner.h:34
Container for the configuration of a Task.
Definition: TaskConfig.h:26
Definition: Aggregator.h:27
A class driving the execution of a QC task inside DPL.
Definition: TaskRunner.h:68
These methods can be used to build a complex processing topology. It spawns 3 separate dummy processi...
Definition: Activity.h:19