CPPMyth
Library to interoperate with MythTV server
mytheventhandler.cpp
1 /*
2  * Copyright (C) 2014 Jean-Luc Barriere
3  *
4  * This Program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  * This Program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; see the file COPYING. If not, write to
16  * the Free Software Foundation, 51 Franklin Street, Fifth Floor, Boston,
17  * MA 02110-1301 USA
18  * http://www.gnu.org/copyleft/gpl.html
19  *
20  */
21 
22 #include "mytheventhandler.h"
23 #include "private/debug.h"
24 #include "proto/mythprotoevent.h"
25 #include "private/os/threads/thread.h"
26 #include "private/os/threads/event.h"
27 #include "private/cppdef.h"
28 #include "private/builtin.h"
29 
30 #include <vector>
31 #include <map>
32 #include <list>
33 
34 using namespace Myth;
35 
40 
41 EventHandler::EventHandlerThread::EventHandlerThread(const std::string& server, unsigned port)
42 : m_server(server)
43 , m_port(port)
44 {
45 }
46 
47 EventHandler::EventHandlerThread::~EventHandlerThread()
48 {
49 }
50 
55 
56 namespace Myth
57 {
58  class SubscriptionHandlerThread : private OS::CThread
59  {
60  public:
61  SubscriptionHandlerThread(EventSubscriber *handle, unsigned subid);
62  virtual ~SubscriptionHandlerThread();
63  EventSubscriber *GetHandle() { return m_handle; }
64  bool IsRunning() { return OS::CThread::IsRunning(); }
65  void PostMessage(const EventMessagePtr& msg);
66 
67  private:
68  EventSubscriber *m_handle;
69  unsigned m_subId;
70  mutable OS::CMutex m_mutex;
71  OS::CEvent m_queueContent;
72  std::list<EventMessagePtr> m_msgQueue;
73 
74  bool Start();
75  void Stop();
76  void *Process();
77  };
78 }
79 
80 SubscriptionHandlerThread::SubscriptionHandlerThread(EventSubscriber *handle, unsigned subid)
81 : OS::CThread()
82 , m_handle(handle)
83 , m_subId(subid)
84 , m_mutex()
85 , m_queueContent()
86 , m_msgQueue()
87 {
88  if (m_handle && Start())
89  DBG(DBG_DEBUG, "%s: subscription is started (%p:%u)\n", __FUNCTION__, m_handle, m_subId);
90  else
91  DBG(DBG_ERROR, "%s: subscription failed (%p:%u)\n", __FUNCTION__, m_handle, m_subId);
92 }
93 
94 SubscriptionHandlerThread::~SubscriptionHandlerThread()
95 {
96  Stop();
97  m_handle = NULL;
98 }
99 
100 bool SubscriptionHandlerThread::Start()
101 {
102  if (OS::CThread::IsRunning())
103  return true;
104  return OS::CThread::StartThread();
105 }
106 
107 void SubscriptionHandlerThread::Stop()
108 {
109  if (OS::CThread::IsRunning())
110  {
111  DBG(DBG_DEBUG, "%s: subscription thread (%p:%u)\n", __FUNCTION__, m_handle, m_subId);
112  // Set stopping. don't wait as we need to signal the thread first
113  OS::CThread::StopThread(false);
114  m_queueContent.Signal();
115  // Wait for thread to stop
116  OS::CThread::StopThread(true);
117  DBG(DBG_DEBUG, "%s: subscription thread (%p:%u) stopped\n", __FUNCTION__, m_handle, m_subId);
118  }
119 }
120 
121 void SubscriptionHandlerThread::PostMessage(const EventMessagePtr& msg)
122 {
123  // Critical section
124  OS::CLockGuard lock(m_mutex);
125  m_msgQueue.push_back(msg);
126  m_queueContent.Signal();
127 }
128 
129 void *SubscriptionHandlerThread::Process()
130 {
131  while (!IsStopped())
132  {
133  while (!m_msgQueue.empty() && !IsStopped())
134  {
135  // Critical section
136  OS::CLockGuard lock(m_mutex);
137  EventMessagePtr msg = m_msgQueue.front();
138  m_msgQueue.pop_front();
139  lock.Unlock();
140  // Do work
141  m_handle->HandleBackendMessage(msg);
142  }
143  // The tread is woken up by m_queueContent.Signal();
144  m_queueContent.Wait();
145  }
146  return NULL;
147 }
148 
153 
154 namespace Myth
155 {
156  class BasicEventHandler : public EventHandler::EventHandlerThread, private OS::CThread
157  {
158  public:
159  BasicEventHandler(const std::string& server, unsigned port);
160  virtual ~BasicEventHandler();
161  // Implements MythEventHandlerThread
162  virtual bool Start();
163  virtual void Stop();
164  virtual void Reset();
165  virtual bool IsRunning();
166  virtual bool IsConnected();
167  virtual unsigned CreateSubscription(EventSubscriber *sub);
168  virtual bool SubscribeForEvent(unsigned subid, EVENT_t event);
169  virtual void RevokeSubscription(unsigned subid);
170  virtual void RevokeAllSubscriptions(EventSubscriber *sub);
171 
172  private:
173  OS::CMutex m_mutex;
174  ProtoEvent *m_event;
175  bool m_reset;
176  // About subscriptions
177  typedef std::map<EVENT_t, std::list<unsigned> > subscriptionsByEvent_t;
178  subscriptionsByEvent_t m_subscriptionsByEvent;
179  typedef std::map<unsigned, SubscriptionHandlerThread*> subscriptions_t;
180  subscriptions_t m_subscriptions;
181 
182  void DispatchEvent(const EventMessagePtr& msg);
183  virtual void* Process(void);
184  void AnnounceStatus(const char *status);
185  void AnnounceTimer();
186  void RetryConnect();
187  };
188 }
189 
190 BasicEventHandler::BasicEventHandler(const std::string& server, unsigned port)
191 : EventHandlerThread(server, port), OS::CThread()
192 , m_event(new ProtoEvent(server,port))
193 , m_reset(false)
194 {
195 }
196 
197 BasicEventHandler::~BasicEventHandler()
198 {
199  Stop();
200  {
201  OS::CLockGuard lock(m_mutex);
202  for (subscriptions_t::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); ++it)
203  delete it->second;
204  m_subscriptions.clear();
205  m_subscriptionsByEvent.clear();
206  }
207  SAFE_DELETE(m_event);
208 }
209 
210 bool BasicEventHandler::Start()
211 {
212  if (OS::CThread::IsRunning())
213  return true;
214  return OS::CThread::StartThread();
215 }
216 
217 void BasicEventHandler::Stop()
218 {
219  if (OS::CThread::IsRunning())
220  {
221  DBG(DBG_DEBUG, "%s: event handler thread (%p)\n", __FUNCTION__, this);
222  OS::CThread::StopThread(true);
223  DBG(DBG_DEBUG, "%s: event handler thread (%p) stopped\n", __FUNCTION__, this);
224  }
225  if (m_event->IsOpen())
226  m_event->Close();
227 }
228 
229 void BasicEventHandler::Reset()
230 {
231  // Hold reset
232  m_reset = true;
233 }
234 
235 bool BasicEventHandler::IsRunning()
236 {
237  return OS::CThread::IsRunning();
238 }
239 
240 bool BasicEventHandler::IsConnected()
241 {
242  return m_event->IsOpen();
243 }
244 
245 unsigned BasicEventHandler::CreateSubscription(EventSubscriber* sub)
246 {
247  unsigned id = 0;
248  OS::CLockGuard lock(m_mutex);
249  subscriptions_t::const_reverse_iterator it = m_subscriptions.rbegin();
250  if (it != m_subscriptions.rend())
251  id = it->first;
252  SubscriptionHandlerThread *handler = new SubscriptionHandlerThread(sub, ++id);
253  if (handler->IsRunning())
254  {
255  m_subscriptions.insert(std::make_pair(id, handler));
256  return id;
257  }
258  // Handler didn't start
259  delete handler;
260  return 0;
261 }
262 
263 bool BasicEventHandler::SubscribeForEvent(unsigned subid, EVENT_t event)
264 {
265  OS::CLockGuard lock(m_mutex);
266  // Only for registered subscriber
267  subscriptions_t::const_iterator it1 = m_subscriptions.find(subid);
268  if (it1 == m_subscriptions.end())
269  return false;
270  std::list<unsigned>::const_iterator it2 = m_subscriptionsByEvent[event].begin();
271  while (it2 != m_subscriptionsByEvent[event].end())
272  {
273  if (*it2 == subid)
274  return true;
275  ++it2;
276  }
277  m_subscriptionsByEvent[event].push_back(subid);
278  return true;
279 }
280 
281 void BasicEventHandler::RevokeSubscription(unsigned subid)
282 {
283  OS::CLockGuard lock(m_mutex);
284  subscriptions_t::iterator it;
285  it = m_subscriptions.find(subid);
286  if (it != m_subscriptions.end())
287  {
288  delete it->second;
289  m_subscriptions.erase(it);
290  }
291 }
292 
293 void BasicEventHandler::RevokeAllSubscriptions(EventSubscriber *sub)
294 {
295  OS::CLockGuard lock(m_mutex);
296  std::vector<subscriptions_t::iterator> its;
297  for (subscriptions_t::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); ++it)
298  {
299  if (sub == it->second->GetHandle())
300  its.push_back(it);
301  }
302  for (std::vector<subscriptions_t::iterator>::const_iterator it = its.begin(); it != its.end(); ++it)
303  {
304  delete (*it)->second;
305  m_subscriptions.erase(*it);
306  }
307 }
308 
309 void BasicEventHandler::DispatchEvent(const EventMessagePtr& msg)
310 {
311  OS::CLockGuard lock(m_mutex);
312  std::vector<std::list<unsigned>::iterator> revoked;
313  std::list<unsigned>::iterator it1 = m_subscriptionsByEvent[msg->event].begin();
314  while (it1 != m_subscriptionsByEvent[msg->event].end())
315  {
316  subscriptions_t::const_iterator it2 = m_subscriptions.find(*it1);
317  if (it2 != m_subscriptions.end())
318  it2->second->PostMessage(msg);
319  else
320  revoked.push_back(it1);
321  ++it1;
322  }
323  std::vector<std::list<unsigned>::iterator>::const_iterator itr;
324  for (itr = revoked.begin(); itr != revoked.end(); ++itr)
325  m_subscriptionsByEvent[msg->event].erase(*itr);
326 }
327 
328 void *BasicEventHandler::Process()
329 {
330  // Try to connect
331  if (m_event->Open())
332  AnnounceStatus(EVENTHANDLER_CONNECTED);
333  while (!OS::CThread::IsStopped())
334  {
335  int r;
336  EventMessage *msg = NULL;
337  r = m_event->RcvBackendMessage(EVENTHANDLER_TIMEOUT, &msg);
338  if (r > 0)
339  DispatchEvent(EventMessagePtr(msg));
340  else if (r < 0)
341  {
342  AnnounceStatus(EVENTHANDLER_DISCONNECTED);
343  RetryConnect();
344  }
345  else
346  {
347  AnnounceTimer();
348  // Reconnect if any held reset
349  if (m_reset)
350  {
351  m_reset = false;
352  m_event->Close();
353  RetryConnect();
354  }
355  }
356  }
357  AnnounceStatus(EVENTHANDLER_STOPPED);
358  // Close connection
359  m_event->Close();
360  return NULL;
361 }
362 
363 void BasicEventHandler::AnnounceStatus(const char *status)
364 {
365  DBG(DBG_DEBUG, "%s: (%p) %s\n", __FUNCTION__, this, status);
366  EventMessage *msg = new EventMessage();
367  msg->event = EVENT_HANDLER_STATUS;
368  msg->subject.push_back(status);
369  msg->subject.push_back(m_server);
370  DispatchEvent(EventMessagePtr(msg));
371 }
372 
373 void BasicEventHandler::AnnounceTimer()
374 {
375  EventMessage *msg = new EventMessage();
376  msg->event = EVENT_HANDLER_TIMER;
377  msg->subject.push_back("");
378  DispatchEvent(EventMessagePtr(msg));
379 }
380 
381 void BasicEventHandler::RetryConnect()
382 {
383  int c = 0;
384  while (!OS::CThread::IsStopped())
385  {
386  if (--c < 0)
387  {
388  if (m_event->Open())
389  {
390  AnnounceStatus(EVENTHANDLER_CONNECTED);
391  m_reset = false; // Release to break any loop
392  break;
393  }
394  c = 10; // Retry after 5 seconds
395  DBG(DBG_INFO, "%s: could not open event socket (%d)\n", __FUNCTION__, m_event->GetSocketErrNo());
396  AnnounceStatus(EVENTHANDLER_NOTCONNECTED);
397  }
398  usleep(500000);
399  }
400 }
401 
406 
407 EventHandler::EventHandler(const std::string& server, unsigned port)
408 : m_imp()
409 {
410  // Choose implementation
411  m_imp = EventHandlerThreadPtr(new BasicEventHandler(server, port));
412 }
This is the main namespace that encloses all public classes.
Definition: mythcontrol.h:29