Project
Loading...
Searching...
No Matches
threadserver.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14
15#ifndef THREADSERVER_H
16#define THREADSERVER_H
17
18#ifdef _WIN32
20#include "sched_affinity_win32_wrapper.h"
21#else
22#include <pthread.h>
23#include <sched.h>
24#endif
25#include "qsem.h"
26
30
31template <class S, class T>
32class qThreadCls;
33
35{
36 template <class S, class T>
37 friend class qThreadCls;
38
39 public:
41 {
42 for (int32_t i = 0; i < 2; i++) {
44 }
45 terminate = false;
46 pinCPU = -1;
47 }
48
50 {
51 for (int32_t i = 0; i < 2; i++) {
53 }
54 }
55
57 {
59 threadMutex[0].Lock();
60 return (!terminate);
61 }
62
63 int32_t threadNum;
64
65 protected:
66 int32_t pinCPU;
68 volatile bool terminate;
69};
70
71template <class S>
73{
74 template <class SS, class TT>
75 friend class qThreadCls;
76
77 private:
78 S* pCls;
79 void (S::*pFunc)(void*);
80};
81
82template <class S, class T>
83static void* qThreadWrapperCls(void* arg);
84
85template <class S, class T>
87{
88 public:
89 qThreadCls() { started = false; };
90 qThreadCls(S* pCls, void (S::*pFunc)(T*), int32_t threadNum = 0, int32_t pinCPU = -1) : threadParam()
91 {
92 started = false;
93 SpawnThread(pCls, pFunc, threadNum, pinCPU);
94 }
95
96 void SpawnThread(S* pCls, void (S::*pFunc)(T*), int32_t threadNum = 0, int32_t pinCPU = -1, bool wait = true)
97 {
98 qThreadParamCls<S>& XthreadParam = *((qThreadParamCls<S>*)&this->threadParam);
99
100 XthreadParam.pCls = pCls;
101 XthreadParam.pFunc = (void (S::*)(void*))pFunc;
102 XthreadParam.threadNum = threadNum;
103 XthreadParam.pinCPU = pinCPU;
104 pthread_t thr;
105 pthread_create(&thr, nullptr, (void* (*)(void*)) & qThreadWrapperCls, &XthreadParam);
106 if (wait) {
107 WaitForSpawn();
108 }
109 started = true;
110 }
111
112 void WaitForSpawn() { threadParam.threadMutex[1].Lock(); }
113
115 {
116 if (started) {
117 End();
118 }
119 }
120
121 void End()
122 {
123 qThreadParamCls<S>& XthreadParam = *((qThreadParamCls<S>*)&this->threadParam);
124
125 XthreadParam.terminate = true;
126 XthreadParam.threadMutex[0].Unlock();
127 XthreadParam.threadMutex[1].Lock();
128 started = false;
129 }
130
131 void Start() { threadParam.threadMutex[0].Unlock(); }
132
133 void Sync() { threadParam.threadMutex[1].Lock(); }
134
135 private:
136 bool started;
137 T threadParam;
138
139 static void* qThreadWrapperCls(T* arg);
140};
141
142template <class S, class T>
144{
145 qThreadParamCls<S>* const arg_A = (qThreadParamCls<S>*)arg;
146 if (arg_A->pinCPU != -1) {
147 cpu_set_t tmp_mask;
148 CPU_ZERO(&tmp_mask);
149 CPU_SET(arg_A->pinCPU, &tmp_mask);
150 sched_setaffinity(0, sizeof(tmp_mask), &tmp_mask);
151 }
152
153 void (S::*pFunc)(T*) = (void (S::*)(T*))arg_A->pFunc;
154 (arg_A->pCls->*pFunc)(arg);
155
156 arg_A->threadMutex[1].Unlock();
157 pthread_exit(nullptr);
158 return (nullptr);
159}
160
161template <class S, class T>
163{
164 public:
166 {
167 pArray = nullptr;
168 nThreadsRunning = 0;
169 }
170 qThreadClsArray(int32_t n, S* pCls, void (S::*pFunc)(T*), int32_t threadNumOffset = 0, int32_t* pinCPU = nullptr)
171 {
172 pArray = nullptr;
173 nThreadsRunning = 0;
174 SetNumberOfThreads(n, pCls, pFunc, threadNumOffset, pinCPU);
175 }
176
177 void SetNumberOfThreads(int32_t n, S* pCls, void (S::*pFunc)(T*), int32_t threadNumOffset = 0, int32_t* pinCPU = nullptr)
178 {
179 if (nThreadsRunning) {
180 fprintf(STD_OUT, "Threads already started\n");
181 throw(qThreadServerException());
182 }
183 pArray = new qThreadCls<S, T>[n];
184 nThreadsRunning = n;
185 for (int32_t i = 0; i < n; i++) {
186 pArray[i].SpawnThread(pCls, pFunc, threadNumOffset + i, pinCPU == nullptr ? -1 : pinCPU[i], false);
187 }
188 for (int32_t i = 0; i < n; i++) {
189 pArray[i].WaitForSpawn();
190 }
191 }
192
194 {
195 if (nThreadsRunning) {
196 EndThreads();
197 }
198 }
199
201 {
202 delete[] pArray;
203 nThreadsRunning = 0;
204 }
205
206 void Start()
207 {
208 for (int32_t i = 0; i < nThreadsRunning; i++) {
209 pArray[i].Start();
210 }
211 }
212
213 void Sync()
214 {
215 for (int32_t i = 0; i < nThreadsRunning; i++) {
216 pArray[i].Sync();
217 }
218 }
219
220 private:
221 qThreadCls<S, T>* pArray;
222 int32_t nThreadsRunning;
223};
224
225#endif
int32_t i
Definition qsem.h:25
int32_t Unlock()
Definition qsem.cxx:48
int32_t Lock()
Definition qsem.cxx:39
void SetNumberOfThreads(int32_t n, S *pCls, void(S::*pFunc)(T *), int32_t threadNumOffset=0, int32_t *pinCPU=nullptr)
qThreadClsArray(int32_t n, S *pCls, void(S::*pFunc)(T *), int32_t threadNumOffset=0, int32_t *pinCPU=nullptr)
void SpawnThread(S *pCls, void(S::*pFunc)(T *), int32_t threadNum=0, int32_t pinCPU=-1, bool wait=true)
void WaitForSpawn()
qThreadCls(S *pCls, void(S::*pFunc)(T *), int32_t threadNum=0, int32_t pinCPU=-1)
int32_t pinCPU
bool WaitForTask()
volatile bool terminate
int32_t threadNum
qSem threadMutex[2]
GLdouble n
Definition glcorearb.h:1982
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
#define STD_OUT
Definition qsem.cxx:21