CPPMyth
Library to interoperate with MythTV server
threadpool.cpp
1 /*
2  * Copyright (C) 2015 Jean-Luc Barriere
3  *
4  * This library is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as published
6  * by the Free Software Foundation; either version 3, or (at your option)
7  * any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; see the file COPYING. If not, write to
16  * the Free Software Foundation, 51 Franklin Street, Fifth Floor, Boston,
17  * MA 02110-1301 USA
18  * http://www.gnu.org/copyleft/gpl.html
19  *
20  */
21 
22 #include "threadpool.h"
23 
24 #include <cassert>
25 
26 #define WTH_KEEPALIVE 5000
27 
28 #ifdef NSROOT
29 using namespace NSROOT::OS;
30 #else
31 using namespace OS;
32 #endif
33 
34 CThreadPool::CThreadPool()
35 : m_size(1)
36 , m_keepAlive(WTH_KEEPALIVE)
37 , m_poolSize(0)
38 , m_waitingCount(0)
39 , m_stopped(false)
40 , m_suspended(false)
41 , m_empty(false)
42 {
43 }
44 
45 CThreadPool::CThreadPool(unsigned size)
46 : m_size(size)
47 , m_keepAlive(WTH_KEEPALIVE)
48 , m_poolSize(0)
49 , m_waitingCount(0)
50 , m_stopped(false)
51 , m_suspended(false)
52 , m_empty(false)
53 {
54 }
55 
56 CThreadPool::~CThreadPool()
57 {
58  m_mutex.Lock();
59  // Reject new runs
60  m_stopped = true;
61  // Destroy all queued workers
62  while (!m_queue.empty())
63  {
64  delete m_queue.front();
65  m_queue.pop();
66  }
67  // Finalize all running
68  if (!m_pool.empty())
69  {
70  m_empty = false;
71  // Signal stop
72  for (std::set<CWorkerThread*>::iterator it = m_pool.begin(); it != m_pool.end(); ++it)
73  (*it)->StopThread(false);
74  // Wake sleeper
75  m_queueFill.Broadcast();
76  // Waiting all finalized
77  m_condition.Wait(m_mutex, m_empty);
78  }
79 }
80 
81 bool CThreadPool::Enqueue(CWorker* worker)
82 {
83  assert(worker->m_queued != true);
84  CLockGuard lock(m_mutex);
85  if (!m_stopped)
86  {
87  worker->m_queued = true;
88  m_queue.push(worker);
89  if (!m_suspended)
90  {
91  if (m_waitingCount)
92  {
93  // Wake a thread
94  m_queueFill.Signal();
95  return true;
96  }
97  else
98  {
99  __resize();
100  return true;
101  }
102  }
103  // Delayed work
104  return true;
105  }
106  return false;
107 }
108 
109 void CThreadPool::SetMaxSize(unsigned size)
110 {
111  CLockGuard lock(m_mutex);
112  m_size = size;
113  if (!m_suspended)
114  __resize();
115 }
116 
117 void CThreadPool::SetKeepAlive(unsigned millisec)
118 {
119  CLockGuard lock(m_mutex);
120  m_keepAlive = millisec;
121 }
122 
123 unsigned CThreadPool::Size() const
124 {
125  CLockGuard lock(m_mutex);
126  return m_poolSize;
127 }
128 
129 unsigned CThreadPool::QueueSize() const
130 {
131  CLockGuard lock(m_mutex);
132  return static_cast<unsigned>(m_queue.size());
133 }
134 
135 bool CThreadPool::IsQueueEmpty() const
136 {
137  CLockGuard lock(m_mutex);
138  return m_queue.empty();
139 }
140 
141 bool CThreadPool::waitEmpty(unsigned millisec)
142 {
143  return IsQueueEmpty() || m_queueEmpty.Wait(millisec);
144 }
145 
146 bool CThreadPool::waitEmpty()
147 {
148  return IsQueueEmpty() || m_queueEmpty.Wait();
149 }
150 
151 void CThreadPool::Suspend()
152 {
153  CLockGuard lock(m_mutex);
154  m_suspended = true;
155 }
156 
157 void CThreadPool::Resume()
158 {
159  CLockGuard lock(m_mutex);
160  m_suspended = false;
161  __resize();
162 }
163 
164 bool CThreadPool::IsSuspended() const
165 {
166  CLockGuard lock(m_mutex);
167  return m_suspended;
168 }
169 
170 void CThreadPool::Reset()
171 {
172  CLockGuard lock(m_mutex);
173  m_stopped = true;
174  // Destroy all queued workers
175  while (!m_queue.empty())
176  {
177  delete m_queue.front();
178  m_queue.pop();
179  }
180 }
181 
182 void CThreadPool::Stop()
183 {
184  CLockGuard lock(m_mutex);
185  m_stopped = true;
186 }
187 
188 void CThreadPool::Start()
189 {
190  CLockGuard lock(m_mutex);
191  m_stopped = false;
192 }
193 
194 bool CThreadPool::IsStopped() const
195 {
196  CLockGuard lock(m_mutex);
197  return m_stopped;
198 }
199 
200 CWorker* CThreadPool::PopQueue(CWorkerThread* _thread)
201 {
202  (void)_thread;
203  CLockGuard lock(m_mutex);
204  if (!m_suspended)
205  {
206  m_queueEmpty.Signal();
207  if (!m_queue.empty())
208  {
209  CWorker* worker = m_queue.front();
210  m_queue.pop();
211  return worker;
212  }
213  }
214  return NULL;
215 }
216 
217 void CThreadPool::WaitQueue(CWorkerThread* _thread)
218 {
219  (void)_thread;
220  CLockGuard lock(m_mutex);
221  ++m_waitingCount;
222  unsigned millisec = m_keepAlive;
223  lock.Unlock();
224  m_queueFill.Wait(millisec);
225  lock.Lock();
226  --m_waitingCount;
227 }
228 
229 void CThreadPool::StartThread(CWorkerThread* _thread)
230 {
231  ++m_poolSize;
232  m_pool.insert(_thread);
233  if (!_thread->StartThread(false))
234  FinalizeThread(_thread);
235 }
236 
237 void CThreadPool::FinalizeThread(CWorkerThread* _thread)
238 {
239  CLockGuard lock(m_mutex);
240  if (m_pool.erase(_thread))
241  {
242  --m_poolSize;
243  delete _thread;
244  }
245  if (m_pool.empty())
246  {
247  m_empty = true;
248  m_condition.Broadcast();
249  }
250 }
251 
252 void CThreadPool::__resize()
253 {
254  if (m_poolSize < m_size && !m_queue.empty())
255  {
256  for (unsigned i = m_queue.size(); i > 0; --i)
257  {
258  if (m_poolSize >= m_size)
259  break;
260  CWorkerThread* _thread = new CWorkerThread(*this);
261  // The new thread will check the queue
262  StartThread(_thread);
263  }
264  }
265  else if (m_poolSize > m_size)
266  {
267  std::set<CWorkerThread*>::iterator it = m_pool.begin();
268  for (unsigned i = m_poolSize - m_size; i > 0; --i)
269  {
270  if (it == m_pool.end())
271  break;
272  (*it)->StopThread(false);
273  ++it;
274  }
275  // Wake up the waiting threads to stop
276  if (m_waitingCount)
277  m_queueFill.Broadcast();
278  }
279 }