Project
Loading...
Searching...
No Matches
ShmManager.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <fairmq/shmem/Common.h>
13#include <fairmq/shmem/UnmanagedRegion.h>
14#include <fairmq/shmem/Segment.h>
15#include <fairmq/shmem/Monitor.h>
16
17#include <fairmq/tools/Unique.h>
18
19#include <fairlogger/Logger.h>
20
21#include <boost/algorithm/string.hpp>
22#include <boost/program_options.hpp>
23
24#include <csignal>
25
26#include <chrono>
27#include <map>
28#include <mutex>
29#include <string>
30#include <thread>
31
32#if !defined(__MACH__) && !defined(__APPLE__)
33#include <syscall.h>
34#define MPOL_DEFAULT 0
35#define MPOL_PREFERRED 1
36#define MPOL_BIND 2
37#define MPOL_INTERLEAVE 3
38#endif
39
40using namespace std;
41using namespace boost::program_options;
42
43namespace
44{
45volatile sig_atomic_t gStopping = 0;
46volatile sig_atomic_t gResetContent = 0;
47}
48
49void signalHandler(int /* signal */)
50{
51 gStopping = 1;
52}
53
54void resetContentHandler(int /* signal */)
55{
56 gResetContent = 1;
57}
58
59struct ShmManager {
60 ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions, uint64_t _refcount_segment_size, bool zero = true)
61 : shmId(fair::mq::shmem::makeShmIdStr(_shmId))
62 {
63 LOG(info) << "Starting ShmManager for shmId: " << shmId;
64 LOG(info) << "Performing full reset...";
65 FullReset();
66 LOG(info) << "Done.";
67 LOG(info) << "Adding managed segments...";
68 AddSegments(_segments, zero);
69 LOG(info) << "Done.";
70 LOG(info) << "Adding unmanaged regions...";
71 AddRegions(_regions, _refcount_segment_size, zero);
72 LOG(info) << "Done.";
73 LOG(info) << "Shared memory is ready for use.";
74 }
75
76 void AddSegments(const vector<string>& _segments, bool zero)
77 {
78 for (const auto& s : _segments) {
79 vector<string> conf;
80 boost::algorithm::split(conf, s, boost::algorithm::is_any_of(","));
81 if (conf.size() != 3) {
82 LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size><numaid>.";
83 fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
84 throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>,<numaid>.");
85 }
86 uint16_t id = stoi(conf.at(0));
87 uint64_t size = stoull(conf.at(1));
88 segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"});
89
90#if !defined(__MACH__) && !defined(__APPLE__)
91 int numaid = stoi(conf.at(2));
92 if (numaid == -2) {
93 LOG(info) << "Setting memory allocation to process default";
94 syscall(SYS_set_mempolicy, MPOL_DEFAULT, nullptr, 0);
95 } else if (numaid == -1) {
96 LOG(info) << "Setting memory allocation to NUMA interleaving";
97 unsigned long nodemask = 0xffffff;
98 syscall(SYS_set_mempolicy, MPOL_INTERLEAVE, &nodemask, sizeof(nodemask) * 8);
99 } else {
100 LOG(info) << "Setting memory allocation to NUMA id " << numaid;
101 unsigned long nodemask = 1 << numaid;
102 syscall(SYS_set_mempolicy, MPOL_BIND, &nodemask, sizeof(nodemask) * 8);
103 }
104#endif
105
106 auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
107 fair::mq::shmem::Segment& segment = ret.first->second;
108 LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
109 segment.Lock();
110 LOG(info) << "Done.";
111 if (zero) {
112 LOG(info) << "Zeroing...";
113 segment.Zero();
114 LOG(info) << "Done.";
115 }
116 }
117 }
118
119 void AddRegions(const vector<string>& _regions, uint64_t _refcount_segment_size, bool zero)
120 {
121 for (const auto& r : _regions) {
122 vector<string> conf;
123 boost::algorithm::split(conf, r, boost::algorithm::is_any_of(","));
124 if (conf.size() != 3) {
125 LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.";
126 fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
127 throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.");
128 }
129 uint16_t id = stoi(conf.at(0));
130 uint64_t size = stoull(conf.at(1));
131 fair::mq::RegionConfig cfg;
132 cfg.id = id;
133 cfg.size = size;
134 if (_refcount_segment_size != 1) {
135 cfg.rcSegmentSize = _refcount_segment_size;
136 }
137 regionCfgs.push_back(cfg);
138
139#if !defined(__MACH__) && !defined(__APPLE__)
140 int numaid = stoi(conf.at(2));
141 if (numaid == -2) {
142 LOG(info) << "Setting memory allocation to process default";
143 syscall(SYS_set_mempolicy, MPOL_DEFAULT, nullptr, 0);
144 } else if (numaid == -1) {
145 LOG(info) << "Setting memory allocation to NUMA interleaving";
146 unsigned long nodemask = 0xffffff;
147 syscall(SYS_set_mempolicy, MPOL_INTERLEAVE, &nodemask, sizeof(nodemask) * 8);
148 } else {
149 LOG(info) << "Setting memory allocation to NUMA id " << numaid;
150 unsigned long nodemask = 1 << numaid;
151 syscall(SYS_set_mempolicy, MPOL_BIND, &nodemask, sizeof(nodemask) * 8);
152 }
153#endif
154
155 auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
156 fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
157 LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
158 region.Lock();
159 LOG(info) << "Done.";
160 if (zero) {
161 LOG(info) << "Zeroing...";
162 region.Zero();
163 LOG(info) << "Done.";
164 }
165 }
166 }
167
169 {
170 std::lock_guard<std::mutex> lock(localMtx);
171 for (const auto& sc : segmentCfgs) {
172 if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) {
173 return false;
174 }
175 }
176 for (const auto& rc : regionCfgs) {
177 if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) {
178 return false;
179 }
180 }
181 return true;
182 }
183
185 {
186 std::lock_guard<std::mutex> lock(localMtx);
187 fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs);
188 }
189
191 {
192 segments.clear();
193 regions.clear();
194 fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
195 }
196
198 {
199 // clean all segments, regions and any other shmem objects belonging to this shmId
200 fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
201 }
202
203 std::string shmId;
204 std::mutex localMtx;
205 map<uint16_t, fair::mq::shmem::Segment> segments;
206 map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
207 std::vector<fair::mq::shmem::SegmentConfig> segmentCfgs;
208 std::vector<fair::mq::RegionConfig> regionCfgs;
209};
210
211int main(int argc, char** argv)
212{
213 fair::Logger::SetConsoleColor(true);
214
215 signal(SIGINT, signalHandler);
216 signal(SIGTERM, signalHandler);
217 signal(SIGUSR1, resetContentHandler);
218
219 try {
220 bool nozero = false;
221 bool checkPresence = true;
222 uint64_t shmId = 0;
223 uint64_t refcount_segment_size = 0;
224 vector<string> segments;
225 vector<string> regions;
226
227 options_description desc("Options");
228 desc.add_options()(
229 "shmid", value<uint64_t>(&shmId)->required(), "Shm id")(
230 "segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size>,<numaid> <id>,<size>,<numaid> <id>,<size>,<numaid> ... (numaid: -2 disabled, -1 interleave, >=0 node)")(
231 "regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size>,<numaid> <id>,<size>,<numaid> ...")(
232 "nozero", value<bool>(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization")(
233 "check-presence", value<bool>(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not")(
234 "refcount-segment-size", value<uint64_t>(&refcount_segment_size)->default_value(1), "Size in bytes of refCount segment (global setting affecting all unmanaged regions, 1 = use default, 0 = disable rc segment)")(
235 "help,h", "Print help");
236
237 variables_map vm;
238 store(parse_command_line(argc, argv, desc), vm);
239
240 if (vm.count("help")) {
241 LOG(info) << "ShmManager"
242 << "\n"
243 << desc;
244 return 0;
245 }
246
247 notify(vm);
248
249 ShmManager shmManager(shmId, segments, regions, refcount_segment_size, !nozero);
250
251 std::thread resetContentThread([&shmManager]() {
252 while (!gStopping) {
253 std::this_thread::sleep_for(std::chrono::milliseconds(50));
254 if (gResetContent == 1) {
255 LOG(info) << "Resetting content for shmId " << shmManager.shmId;
256 shmManager.ResetContent();
257 gResetContent = 0;
258 LOG(info) << "Done resetting content for shmId " << shmManager.shmId;
259 }
260 }
261 });
262
263 if (checkPresence) {
264 while (!gStopping) {
265 if (shmManager.CheckPresence() == false) {
266 LOG(error) << "Failed to find segments, exiting.";
267 gStopping = true;
268 }
269 std::this_thread::sleep_for(std::chrono::milliseconds(500));
270 }
271 }
272
273 if (resetContentThread.joinable()) {
274 resetContentThread.join();
275 }
276
277 LOG(info) << "stopping.";
278 } catch (exception& e) {
279 LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting";
280 return 2;
281 }
282
283 return 0;
284}
void resetContentHandler(int)
#define MPOL_INTERLEAVE
#define MPOL_DEFAULT
void signalHandler(int)
#define MPOL_BIND
GLuint segment
Definition glcorearb.h:4945
GLsizeiptr size
Definition glcorearb.h:659
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint segments
Definition glcorearb.h:4946
GLboolean r
Definition glcorearb.h:1233
GLuint id
Definition glcorearb.h:650
Defining DataPointCompositeObject explicitly as copiable.
ShmManager(uint64_t _shmId, const vector< string > &_segments, const vector< string > &_regions, uint64_t _refcount_segment_size, bool zero=true)
void AddSegments(const vector< string > &_segments, bool zero)
map< uint16_t, fair::mq::shmem::Segment > segments
std::mutex localMtx
std::string shmId
bool CheckPresence()
map< uint16_t, unique_ptr< fair::mq::shmem::UnmanagedRegion > > regions
void AddRegions(const vector< string > &_regions, uint64_t _refcount_segment_size, bool zero)
void ResetContent()
std::vector< fair::mq::shmem::SegmentConfig > segmentCfgs
void FullReset()
std::vector< fair::mq::RegionConfig > regionCfgs
#define main
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"