22 #include "mytheventhandler.h" 23 #include "private/debug.h" 24 #include "proto/mythprotoevent.h" 25 #include "private/os/threads/thread.h" 26 #include "private/os/threads/event.h" 27 #include "private/cppdef.h" 28 #include "private/builtin.h" 41 EventHandler::EventHandlerThread::EventHandlerThread(
const std::string& server,
unsigned port)
47 EventHandler::EventHandlerThread::~EventHandlerThread()
64 bool IsRunning() {
return OS::CThread::IsRunning(); }
65 void PostMessage(
const EventMessagePtr& msg);
70 mutable OS::CMutex m_mutex;
71 OS::CEvent m_queueContent;
72 std::list<EventMessagePtr> m_msgQueue;
80 SubscriptionHandlerThread::SubscriptionHandlerThread(
EventSubscriber *handle,
unsigned subid)
88 if (m_handle && Start())
89 DBG(DBG_DEBUG,
"%s: subscription is started (%p:%u)\n", __FUNCTION__, m_handle, m_subId);
91 DBG(DBG_ERROR,
"%s: subscription failed (%p:%u)\n", __FUNCTION__, m_handle, m_subId);
94 SubscriptionHandlerThread::~SubscriptionHandlerThread()
100 bool SubscriptionHandlerThread::Start()
102 if (OS::CThread::IsRunning())
104 return OS::CThread::StartThread();
107 void SubscriptionHandlerThread::Stop()
109 if (OS::CThread::IsRunning())
111 DBG(DBG_DEBUG,
"%s: subscription thread (%p:%u)\n", __FUNCTION__, m_handle, m_subId);
113 OS::CThread::StopThread(
false);
114 m_queueContent.Signal();
116 OS::CThread::StopThread(
true);
117 DBG(DBG_DEBUG,
"%s: subscription thread (%p:%u) stopped\n", __FUNCTION__, m_handle, m_subId);
121 void SubscriptionHandlerThread::PostMessage(
const EventMessagePtr& msg)
124 OS::CLockGuard lock(m_mutex);
125 m_msgQueue.push_back(msg);
126 m_queueContent.Signal();
129 void *SubscriptionHandlerThread::Process()
133 while (!m_msgQueue.empty() && !IsStopped())
136 OS::CLockGuard lock(m_mutex);
137 EventMessagePtr msg = m_msgQueue.front();
138 m_msgQueue.pop_front();
141 m_handle->HandleBackendMessage(msg);
144 m_queueContent.Wait();
162 virtual bool Start();
164 virtual void Reset();
165 virtual bool IsRunning();
166 virtual bool IsConnected();
168 virtual bool SubscribeForEvent(
unsigned subid, EVENT_t event);
169 virtual void RevokeSubscription(
unsigned subid);
177 typedef std::map<EVENT_t, std::list<unsigned> > subscriptionsByEvent_t;
178 subscriptionsByEvent_t m_subscriptionsByEvent;
179 typedef std::map<unsigned, SubscriptionHandlerThread*> subscriptions_t;
180 subscriptions_t m_subscriptions;
182 void DispatchEvent(
const EventMessagePtr& msg);
183 virtual void* Process(
void);
184 void AnnounceStatus(
const char *status);
185 void AnnounceTimer();
190 BasicEventHandler::BasicEventHandler(
const std::string& server,
unsigned port)
191 : EventHandlerThread(server, port), OS::CThread()
197 BasicEventHandler::~BasicEventHandler()
201 OS::CLockGuard lock(m_mutex);
202 for (subscriptions_t::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); ++it)
204 m_subscriptions.clear();
205 m_subscriptionsByEvent.clear();
207 SAFE_DELETE(m_event);
210 bool BasicEventHandler::Start()
212 if (OS::CThread::IsRunning())
214 return OS::CThread::StartThread();
217 void BasicEventHandler::Stop()
219 if (OS::CThread::IsRunning())
221 DBG(DBG_DEBUG,
"%s: event handler thread (%p)\n", __FUNCTION__,
this);
222 OS::CThread::StopThread(
true);
223 DBG(DBG_DEBUG,
"%s: event handler thread (%p) stopped\n", __FUNCTION__,
this);
225 if (m_event->IsOpen())
229 void BasicEventHandler::Reset()
235 bool BasicEventHandler::IsRunning()
237 return OS::CThread::IsRunning();
240 bool BasicEventHandler::IsConnected()
242 return m_event->IsOpen();
248 OS::CLockGuard lock(m_mutex);
249 subscriptions_t::const_reverse_iterator it = m_subscriptions.rbegin();
250 if (it != m_subscriptions.rend())
253 if (handler->IsRunning())
255 m_subscriptions.insert(std::make_pair(
id, handler));
263 bool BasicEventHandler::SubscribeForEvent(
unsigned subid, EVENT_t event)
265 OS::CLockGuard lock(m_mutex);
267 subscriptions_t::const_iterator it1 = m_subscriptions.find(subid);
268 if (it1 == m_subscriptions.end())
270 std::list<unsigned>::const_iterator it2 = m_subscriptionsByEvent[event].begin();
271 while (it2 != m_subscriptionsByEvent[event].end())
277 m_subscriptionsByEvent[event].push_back(subid);
281 void BasicEventHandler::RevokeSubscription(
unsigned subid)
283 OS::CLockGuard lock(m_mutex);
284 subscriptions_t::iterator it;
285 it = m_subscriptions.find(subid);
286 if (it != m_subscriptions.end())
289 m_subscriptions.erase(it);
295 OS::CLockGuard lock(m_mutex);
296 std::vector<subscriptions_t::iterator> its;
297 for (subscriptions_t::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); ++it)
299 if (sub == it->second->GetHandle())
302 for (std::vector<subscriptions_t::iterator>::const_iterator it = its.begin(); it != its.end(); ++it)
304 delete (*it)->second;
305 m_subscriptions.erase(*it);
309 void BasicEventHandler::DispatchEvent(
const EventMessagePtr& msg)
311 OS::CLockGuard lock(m_mutex);
312 std::vector<std::list<unsigned>::iterator> revoked;
313 std::list<unsigned>::iterator it1 = m_subscriptionsByEvent[msg->event].begin();
314 while (it1 != m_subscriptionsByEvent[msg->event].end())
316 subscriptions_t::const_iterator it2 = m_subscriptions.find(*it1);
317 if (it2 != m_subscriptions.end())
318 it2->second->PostMessage(msg);
320 revoked.push_back(it1);
323 std::vector<std::list<unsigned>::iterator>::const_iterator itr;
324 for (itr = revoked.begin(); itr != revoked.end(); ++itr)
325 m_subscriptionsByEvent[msg->event].erase(*itr);
328 void *BasicEventHandler::Process()
332 AnnounceStatus(EVENTHANDLER_CONNECTED);
333 while (!OS::CThread::IsStopped())
337 r = m_event->RcvBackendMessage(EVENTHANDLER_TIMEOUT, &msg);
339 DispatchEvent(EventMessagePtr(msg));
342 AnnounceStatus(EVENTHANDLER_DISCONNECTED);
357 AnnounceStatus(EVENTHANDLER_STOPPED);
363 void BasicEventHandler::AnnounceStatus(
const char *status)
365 DBG(DBG_DEBUG,
"%s: (%p) %s\n", __FUNCTION__,
this, status);
367 msg->event = EVENT_HANDLER_STATUS;
368 msg->subject.push_back(status);
369 msg->subject.push_back(m_server);
370 DispatchEvent(EventMessagePtr(msg));
373 void BasicEventHandler::AnnounceTimer()
376 msg->event = EVENT_HANDLER_TIMER;
377 msg->subject.push_back(
"");
378 DispatchEvent(EventMessagePtr(msg));
381 void BasicEventHandler::RetryConnect()
384 while (!OS::CThread::IsStopped())
390 AnnounceStatus(EVENTHANDLER_CONNECTED);
395 DBG(DBG_INFO,
"%s: could not open event socket (%d)\n", __FUNCTION__, m_event->GetSocketErrNo());
396 AnnounceStatus(EVENTHANDLER_NOTCONNECTED);
407 EventHandler::EventHandler(
const std::string& server,
unsigned port)
This is the main namespace that encloses all public classes.