CPPMyth
Library to interoperate with MythTV server
threadpool.h
1 #pragma once
2 /*
3  * Copyright (C) 2015 Jean-Luc Barriere
4  *
5  * This library is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU Lesser General Public License as published
7  * by the Free Software Foundation; either version 3, or (at your option)
8  * any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public License
16  * along with this library; see the file COPYING. If not, write to
17  * the Free Software Foundation, 51 Franklin Street, Fifth Floor, Boston,
18  * MA 02110-1301 USA
19  * http://www.gnu.org/copyleft/gpl.html
20  *
21  */
22 
23 #include "thread.h"
24 #include "event.h"
25 
26 #include <queue>
27 #include <set>
28 
29 #ifdef NSROOT
30 namespace NSROOT {
31 #endif
32 namespace OS
33 {
34 
35  class CWorker;
36 
37  class CWorkerThread;
38 
40  {
41  friend class CWorkerThread;
42  public:
43  CThreadPool();
44  CThreadPool(unsigned size);
45  ~CThreadPool();
46 
47  bool Enqueue(CWorker* worker);
48 
49  unsigned GetMaxSize() const { return m_size; }
50 
51  void SetMaxSize(unsigned size);
52 
53  void SetKeepAlive(unsigned millisec);
54 
55  unsigned Size() const;
56 
57  unsigned QueueSize() const;
58  bool IsQueueEmpty() const;
59  bool waitEmpty(unsigned millisec);
60  bool waitEmpty();
61 
62  void Suspend();
63  void Resume();
64  bool IsSuspended() const;
65 
66  void Reset();
67  void Stop();
68  void Start();
69  bool IsStopped() const;
70 
71  private:
72  unsigned m_size;
73  unsigned m_keepAlive;
74  unsigned m_poolSize;
75  unsigned m_waitingCount;
76  volatile bool m_stopped;
77  volatile bool m_suspended;
78  volatile bool m_empty;
79 
80  std::queue<CWorker*> m_queue;
81  std::set<CWorkerThread*> m_pool;
82  mutable CMutex m_mutex;
83  CCondition<volatile bool> m_condition;
84  CEvent m_queueFill;
85  CEvent m_queueEmpty;
86 
87  CWorker* PopQueue(CWorkerThread* _thread);
88  void WaitQueue(CWorkerThread* _thread);
89  void StartThread(CWorkerThread* _thread);
90  void FinalizeThread(CWorkerThread* _thread);
91  void __resize();
92  };
93 
94  class CWorker
95  {
96  friend class CThreadPool;
97  public:
98  CWorker() : m_queued(false) { }
99  virtual ~CWorker() { }
100  virtual void Process() = 0;
101 
102  private:
103  bool m_queued;
104  };
105 
106  class CWorkerThread : public CThread
107  {
108  public:
110  : CThread()
111  , m_threadPool(pool) { m_finalizeOnStop = true; }
112 
113  void* Process(void)
114  {
115  bool waiting = false;
116 
117  while (!IsStopped())
118  {
119  CWorker* worker = m_threadPool.PopQueue(this);
120  if (worker != NULL)
121  {
122  worker->Process();
123  delete worker;
124  waiting = false;
125  }
126  else if (!waiting)
127  {
128  m_threadPool.WaitQueue(this);
129  waiting = true;
130  }
131  else
132  break;
133  }
134 
135  return NULL;
136  }
137 
138  void Finalize(void)
139  {
140  m_threadPool.FinalizeThread(this);
141  }
142 
143  private:
144  CThreadPool& m_threadPool;
145  };
146 
147 }
148 #ifdef NSROOT
149 }
150 #endif