Project
Loading...
Searching...
No Matches
DataProcessingDevice.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAPROCESSINGDEVICE_H_
12#define O2_FRAMEWORK_DATAPROCESSINGDEVICE_H_
13
27#include "Framework/Tracing.h"
30
31#include <fairmq/Device.h>
32#include <fairmq/Parts.h>
33
34#include <memory>
35#include <mutex>
36#include <uv.h>
37
38namespace o2::framework
39{
40
41struct InputChannelInfo;
42struct DeviceState;
43struct ComputingQuotaEvaluator;
44
52class DataProcessingDevice;
53struct DataProcessorContext;
54struct DeviceContext;
55
57 int index = -1;
58};
59
70
72 static std::unique_ptr<ConfigParamStore> getConfiguration(ServiceRegistryRef registry, const char* name, std::vector<ConfigParamSpec> const& options);
73};
74
78{
79 public:
81 void Init() final;
82 void InitTask() final;
83 void PreRun() final;
84 void PostRun() final;
85 void Reset() final;
86 void ResetTask() final;
87 void Run() final;
88
89 // Processing functions are now renetrant
90 static void doRun(ServiceRegistryRef);
91 static void doPrepare(ServiceRegistryRef);
93 static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector<DataRelayer::RecordAction>& completed);
94
95 protected:
96 void error(const char* msg);
97 void fillContext(DataProcessorContext& context, DeviceContext& deviceContext);
98
99 private:
101 void initPollers();
102 void startPollers();
103 void stopPollers();
105 RunningDeviceRef mRunningDevice;
106
107 std::unique_ptr<ConfigParamRegistry> mConfigRegistry;
108 ServiceRegistry& mServiceRegistry;
109
110 uint64_t mLastSlowMetricSentTimestamp = 0;
111 uint64_t mLastMetricFlushedTimestamp = 0;
112 uint64_t mBeginIterationTimestamp = 0;
113 std::vector<fair::mq::RegionInfo> mPendingRegionInfos;
114 std::mutex mRegionInfoMutex;
115 ProcessingPolicies mProcessingPolicies;
116 std::vector<uv_work_t> mHandles;
117 std::vector<TaskStreamInfo> mStreams;
120 uv_async_t* mAwakeHandle = nullptr;
121};
122
123} // namespace o2::framework
124#endif // O2_FRAMEWORK_DATAPROCESSINGDEVICE_H_
struct uv_async_s uv_async_t
static void doRun(ServiceRegistryRef)
void fillContext(DataProcessorContext &context, DeviceContext &deviceContext)
static void doPrepare(ServiceRegistryRef)
static bool tryDispatchComputation(ServiceRegistryRef ref, std::vector< DataRelayer::RecordAction > &completed)
static void handleData(ServiceRegistryRef, InputChannelInfo &)
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static std::unique_ptr< ConfigParamStore > getConfiguration(ServiceRegistryRef registry, const char *name, std::vector< ConfigParamSpec > const &options)
uv_work_t * task
The libuv task handle.
ServiceRegistry * registry
The registry associated to the task being run.
bool running
Wether or not this task is running.
TaskStreamRef id
The id of this stream.
uint64_t const void const *restrict const msg
Definition x9.h:153