Project
Loading...
Searching...
No Matches
ArrowContext.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_ARROWCONTEXT_H_
12#define O2_FRAMEWORK_ARROWCONTEXT_H_
13
16#include <cassert>
17#include <functional>
18#include <memory>
19#include <string>
20#include <vector>
21
22#include <fairmq/FwdDecls.h>
23
24namespace o2::framework
25{
26
27class FairMQResizableBuffer;
28
33{
34 public:
36
38 : mProxy{proxy}
39 {
40 }
41
42 struct MessageRef {
44 std::unique_ptr<fair::mq::Message> header;
46 std::shared_ptr<FairMQResizableBuffer> buffer;
48 std::function<void(std::shared_ptr<FairMQResizableBuffer>)> finalize;
50 };
51
52 using Messages = std::vector<MessageRef>;
53
54 void addBuffer(std::unique_ptr<fair::mq::Message> header,
55 std::shared_ptr<FairMQResizableBuffer> buffer,
56 std::function<void(std::shared_ptr<FairMQResizableBuffer>)> finalize,
57 RouteIndex routeIndex)
58 {
59 mMessages.push_back(MessageRef{std::move(header),
60 std::move(buffer),
61 std::move(finalize),
62 routeIndex});
63 }
64
65 Messages::iterator begin()
66 {
67 return mMessages.begin();
68 }
69
70 Messages::iterator end()
71 {
72 return mMessages.end();
73 }
74
75 size_t size()
76 {
77 return mMessages.size();
78 }
79
80 void clear()
81 {
82 // On send we move the header, but the payload remains
83 // there because what's really sent is the copy of the string
84 // payload will be cleared by the mMessages.clear()
85 mMessages.clear();
86 }
87
89 {
90 return mProxy;
91 }
92
94 {
95 mBytesSent += value;
96 }
97
99 {
100 mBytesDestroyed += value;
101 }
102
104 {
105 mMessagesCreated += value;
106 }
107
109 {
110 mMessagesDestroyed += value;
111 }
112
113 size_t bytesSent()
114 {
115 return mBytesSent;
116 }
117
119 {
120 return mBytesDestroyed;
121 }
122
124 {
125 return mMessagesCreated;
126 }
127
129 {
130 return mMessagesDestroyed;
131 }
132
133 private:
134 FairMQDeviceProxy& mProxy;
135 Messages mMessages;
136 size_t mBytesSent = 0;
137 size_t mBytesDestroyed = 0;
138 size_t mMessagesCreated = 0;
139 size_t mMessagesDestroyed = 0;
140 size_t mRateLimit = 0;
141};
142
143} // namespace o2::framework
144#endif // O2_FRAMEWORK_ARROWCONTEXT_H_
std::vector< MessageRef > Messages
Messages::iterator end()
Messages::iterator begin()
void updateBytesDestroyed(size_t value)
void addBuffer(std::unique_ptr< fair::mq::Message > header, std::shared_ptr< FairMQResizableBuffer > buffer, std::function< void(std::shared_ptr< FairMQResizableBuffer >)> finalize, RouteIndex routeIndex)
void updateMessagesSent(size_t value)
void updateMessagesDestroyed(size_t value)
static constexpr ServiceKind service_kind
void updateBytesSent(size_t value)
FairMQDeviceProxy & proxy()
ArrowContext(FairMQDeviceProxy &proxy)
GLuint buffer
Definition glcorearb.h:655
GLsizei const GLfloat * value
Definition glcorearb.h:819
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ServiceKind
The kind of service we are asking for.
std::unique_ptr< fair::mq::Message > header
The header to be associated with the message.
std::shared_ptr< FairMQResizableBuffer > buffer
The actual buffer holding the ArrowData.
std::function< void(std::shared_ptr< FairMQResizableBuffer >)> finalize
The function to call to finalise the builder into the message.